import warnings
from typing import Dict, List, Union
from datetime import date as dt_date, datetime, timedelta
from dateutil.tz import gettz
from onetick.py import configuration, utils
import onetick.py as otp
from onetick.py.core import db_constants
import onetick.query as otq
import pandas as pd
def _datetime2date(dt: Union[dt_date, datetime]) -> dt_date:
""" Convert datetime and date explicitly into the datetime.date """
return dt_date(dt.year, dt.month, dt.day)
[docs]class DB:
def __init__(self, name, context=utils.default):
self.name = name
if context is utils.default or context is None:
self.context = otp.config.context
else:
self.context = context
self._locator_date_ranges = None
def __eq__(self, obj):
return str(self) == str(obj)
def __lt__(self, obj):
return str(self) < str(obj)
def __str__(self):
return self.name
def _set_intervals(self):
"""
Finds all date ranges from locators.
These intervals are required to find all possible dates with data.
It is only possible by querying the DB_SHOW_LOADED_TIME_RANGE
against the largest possible query date range.
"""
if self._locator_date_ranges is None:
graph = otq.GraphQuery(otq.DbShowConfiguredTimeRanges(db_name=self.name).tick_type("ANY")
>> otq.Table(fields='long START_DATE, long END_DATE'))
result = otp.run(graph,
symbols=f'{self.name}::',
# start and end times don't matter for this query, use some constants
start=db_constants.DEFAULT_START_DATE,
end=db_constants.DEFAULT_END_DATE,
# GMT, because start/end timestamp in locator are in GMT
timezone='GMT',
context=self.context)
date_ranges = []
tz_gmt = gettz('GMT')
for inx in range(len(result)):
start_date = result['START_DATE'][inx]
if start_date < 0:
# On Windows datetime.fromtimestamp throws an OSError for negative values
start_date = 0
start = datetime.fromtimestamp(start_date / 1000, tz=tz_gmt)
start = start.replace(tzinfo=None)
end = datetime.fromtimestamp(result['END_DATE'][inx] / 1000, tz=tz_gmt)
end = end.replace(tzinfo=None)
date_ranges.append((start, end))
# merge ranges if necessary to reduce number of queries
# for `dates` property then
self._locator_date_ranges = []
start, end = None, None
for t_start, t_end in date_ranges:
if start is None:
start = t_start
if end is None:
end = t_end
else:
if t_start == end:
end = t_end
else:
self._locator_date_ranges.append((start, end))
start, end = t_start, t_end
if start and end:
self._locator_date_ranges.append((start, end))
def _show_loaded_time_ranges(self, start, end, only_last=False):
eps = otq.DbShowLoadedTimeRanges(use_cache=True).tick_type('ANY')
eps = eps >> otq.WhereClause(where='NUM_LOADED_PARTITIONS > 0')
if only_last:
eps = eps >> otq.LastTick()
eps = eps >> otq.Table(fields='string START_DATE, string END_DATE')
graph = otq.GraphQuery(eps)
result = otp.run(graph,
symbols=f'{self.name}::',
start=start,
end=end,
# GMT works properly for locators with gap
timezone='GMT',
context=self.context)
dates = []
# every record contains consequent intervals of data on disk
for inx in range(len(result)):
start = datetime.strptime(result['START_DATE'][inx], '%Y%m%d')
end = datetime.strptime(result['END_DATE'][inx], '%Y%m%d')
if only_last:
return [_datetime2date(end)]
while start <= end:
dates.append(_datetime2date(start))
start += timedelta(days=1)
return dates
def __get_dates(self, only_last=False):
""" Returns list of dates with data """
self._set_intervals()
dates = []
today = dt_date.today()
today = datetime(today.year, today.month, today.day)
# searching in reversed order in case we need only_last date
for locator_start, locator_end in reversed(self._locator_date_ranges):
# future is not loaded yet
if locator_start > today:
continue
if locator_end > today:
locator_end = today
# locator date range can be very big, so splitting it in smaller parts
# (because _show_loaded_time_ranges() can be very slow for big time ranges)
# it is especially useful when we only need the last date
start = end = locator_end
delta = 1 if only_last else 365
while locator_start < start:
start = end - timedelta(days=delta)
start = max(locator_start, start)
loaded_dates = self._show_loaded_time_ranges(start, end, only_last=only_last)
if only_last and loaded_dates:
return loaded_dates[-1]
dates = loaded_dates + dates
end = start
# if we are not getting data, then increasing time range to find it faster
if not loaded_dates:
delta *= 2
if only_last and len(dates) == 0:
return None # no data on disk
return dates
[docs] def dates(self):
"""
Returns list of dates for which data is available.
Returns
-------
``datetime.date`` or ``None``
Returns ``None`` when there is no data in the database
Examples
--------
>>> some_db = otp.databases()['SOME_DB']
>>> some_db.dates()
[datetime.date(2003, 12, 1)]
"""
return self.__get_dates()
def last_not_empty_date(self, last_date, days_back, timezone=None, tick_type=None):
"""
Find first day that has data
starting from `last_date` and going `days_back` number of days back.
"""
min_locator_date = self.min_locator_date()
for i in range(days_back + 1):
date = _datetime2date(last_date - timedelta(days=i))
if date < min_locator_date:
break
tick_types = self.tick_types(date, timezone=timezone)
if tick_type is None and tick_types:
return date
if tick_type is not None and tick_type in tick_types:
return date
return None
@property
def last_date(self):
"""
The latest date on which db has data
Returns
-------
``datetime.datetime`` or ``None``
Returns ``None`` when there is no data in the database
Examples
--------
>>> some_db = otp.databases()['SOME_DB']
>>> some_db.last_date
datetime.date(2003, 12, 1)
"""
return self._get_last_date()
def _get_last_date(self, tick_type=None):
last_date = self.__get_dates(only_last=True)
if last_date is None:
return None
# It might happen that database loading processes is configured
# to work over weekends and holidays and therefore
# there are days that are configured but have no data, tick types and schema.
# We want to find the closest not empty day because
# we want to expose the most actual schema to end user.
# For example, this is a case of OneTick Cloud NYSE_TAQ database.
# We only scan 5 previous days to cover weekends + possible conjuncted holidays.
# According to the official NYSE calendar there are no more than 5 closed days.
date = self.last_not_empty_date(last_date, days_back=5, tick_type=tick_type)
if date is None:
warnings.warn(
"Can't find not empty day for the last 5 days, using last configured day. "
"Try to use .last_not_empty_date() function to find older not empty days."
)
return last_date
return date
[docs] def tick_types(self, date=None, timezone=None) -> List[str]:
"""
Returns list of tick types for the ``date``.
Parameters
----------
date: :class:`otp.dt <onetick.py.datetime>`, ``datetime.datetime``, optional
Date for the tick types look up. ``None`` means the :attr:`last_date`
timezone: str, optional
Timezone for the look up. ``None`` means the default timezone.
Returns
-------
list
List with string values of available tick types.
Examples
--------
>>> nyse_taq_db = otp.databases()['NYSE_TAQ']
>>> nyse_taq_db.tick_types(date=otp.dt(2022, 3, 1))
['QTE', 'TRD']
"""
date = self.last_date if date is None else date
if timezone is None:
timezone = configuration.config.tz
time_params = {}
if date is not None:
time_params["start"] = date
time_params["end"] = date + timedelta(days=1)
# PY-458: don't use cache, it can return different result in some cases
result = otp.run(otq.DbShowTickTypes(use_cache=False,
show_schema=False,
include_memdb=True),
symbols=f'{self.name}::',
**time_params,
timezone=timezone,
context=self.context)
if len(result) == 0:
return []
return result['TICK_TYPE_NAME'].tolist()
def min_locator_date(self):
self._set_intervals()
min_date = min(obj[0] for obj in self._locator_date_ranges)
return _datetime2date(min_date)
[docs] def schema(self, date=None, tick_type=None, timezone=None) -> Dict[str, type]:
"""
Gets the schema of the database.
Parameters
----------
date: :class:`otp.dt <onetick.py.datetime>`, ``datetime.datetime``, optional
Date for the schema. ``None`` means the :attr:`last_date`
tick_type: str, optional
Specifies a tick type for schema. ``None`` means use the one available
tick type, if there are multiple tick types then it raises the ``Exception``.
It uses the :meth:`tick_types` method.
timezone: str, optional
Allows to specify a timezone for searching tick types.
Returns
-------
dict
Dict where keys are field names and values are ``onetick.py`` :ref:`types <schema concept>`.
It's compatible with the :attr:`onetick.py.Source.schema` methods.
Examples
--------
>>> nyse_taq_db = otp.databases()['NYSE_TAQ']
>>> nyse_taq_db.schema(tick_type='TRD', date=otp.dt(2022, 3, 1))
{'PRICE': <class 'float'>, 'SIZE': <class 'int'>}
"""
orig_date = date
if date is None:
date = self._get_last_date(tick_type=tick_type)
if timezone is None:
timezone = configuration.config.tz
if tick_type is None:
tick_types = self.tick_types(date=date, timezone=timezone)
if len(tick_types) == 0:
raise Exception("No tick types has found and specified")
if len(tick_types) > 1:
raise Exception("Database has multiple tick types, please specify using the `tick_type` parameter")
tick_type = tick_types[0]
if date is None:
# it might happen when a database has no data on disks
return {}
# Convert explicitly into the datetime.date, because min_date and date
# could be date or datetime types, and datetime is not comparable with datetime.date
date = _datetime2date(date)
def get_schema(use_cache: bool = True):
return otp.run(otq.DbShowTickTypes(use_cache=use_cache,
show_schema=True,
include_memdb=True)
>> otq.WhereClause(where=f'TICK_TYPE_NAME="{tick_type}"'),
symbols=f'{self.name}::',
start=date,
end=date + timedelta(days=1),
timezone=timezone,
context=self.context)
result = get_schema(use_cache=True)
if not len(result):
# in case cache settings in database are bad (e.g. BEXRTS-1220)
result = get_schema(use_cache=False)
if len(result):
# filter schema by date
date_to_filter = None
if orig_date:
# if date is passed as a parameter -- then use it
date_to_filter = date
else:
# otherwise use the closest date
date_to_filter = result['Time'].max()
result = result[(result['Time'] >= pd.Timestamp(date_to_filter))]
fields = zip(result['FIELD_NAME'].tolist(),
result['FIELD_TYPE_NAME'].tolist(),
result['FIELD_SIZE'].tolist())
else:
fields = []
schema = {}
for fname, ftype, fsize in fields:
dtype = None
if 'UINT32' in ftype:
dtype = otp.uint
elif 'UINT64' in ftype:
dtype = otp.ulong
elif 'INT8' in ftype:
dtype = otp.byte
elif 'INT16' in ftype:
dtype = otp.short
elif 'INT' in ftype:
dtype = int
elif 'MSEC' in ftype:
dtype = otp.msectime
elif 'NSEC' in ftype:
dtype = otp.nsectime
elif 'DOUBLE' in ftype or 'FLOAT' in ftype:
dtype = float
elif 'DECIMAL' in ftype:
dtype = otp.decimal
elif 'VARSTRING' in ftype:
dtype = otp.varstring
elif 'STRING' in ftype:
if fsize == 64:
dtype = str
else:
dtype = otp.string[fsize]
else:
warnings.warn(
f"Unsupported field type '{ftype}' for field '{fname}'. "
"Note that this field will be ignored "
"and will not be added to the python schema, "
"but will still remain in the OneTick schema."
)
continue
schema[fname] = dtype
return schema
[docs] def symbols(self, date=None, timezone=None, tick_type=None, pattern='.*') -> List[str]:
"""
Finds a list of available symbols in the database
Parameters
----------
date: :class:`otp.dt <onetick.py.datetime>`, ``datetime.datetime``, optional
Date for the symbols look up. ``None`` means the :attr:`last_date`
tick_type: str, optional
Tick type for symbols. ``None`` means union across all tick types.
timezone: str, optional
Timezone for the lookup. ``None`` means the default timezone.
pattern: str
Regular expression to select symbols.
Examples
--------
>>> nyse_taq_db = otp.databases()['NYSE_TAQ']
>>> nyse_taq_db.symbols(date=otp.dt(2022, 3, 1), tick_type='TRD', pattern='^AAP.*')
['AAP', 'AAPL']
"""
import onetick.py as otp
if date is None:
date = self.last_date
if timezone is None:
timezone = configuration.config.tz
if tick_type is None:
tick_type = ''
eps = otq.FindDbSymbols(pattern='%', tick_type_field=tick_type) \
>> otq.AddField(field='SYMBOL', value='token(SYMBOL_NAME, -1, ":")') \
>> otq.WhereClause(where=f'regex_match(SYMBOL, "{pattern}")') \
>> otq.Table(fields='string SYMBOL')
result = otp.run(eps,
symbols=f'{self.name}::',
start=date,
end=date + timedelta(days=1),
timezone=timezone,
context=self.context)
if len(result) == 0:
return []
return result['SYMBOL'].tolist()
[docs]def databases(context=utils.default, derived=False) -> Dict[str, DB]:
"""
Gets all available databases in the ``context``
Parameters
----------
context: str, optional
Context to run the query.
If not set then default :py:attr:`context<onetick.py.configuration.Config.context>` is used.
derived: bool, dict
If False then derived databases are not returned.
If dict then its items used as parameters to :py:func:`~onetick.py.derived_databases`
If True then default parameters for :py:func:`~onetick.py.derived_databases` are used.
See also
--------
| **SHOW_DB_LIST** OneTick event processor
| :py:func:`derived_databases`
Returns
-------
dict
Dict where keys are database names and values are :class:`DB <onetick.py.db._inspection.DB>` objects
with ``context`` specified.
"""
dbs = otp.run(otq.ShowDbList().tick_type("ANY"),
symbols='LOCAL::',
# start and end times don't matter for this query, use some constants
start=db_constants.DEFAULT_START_DATE,
end=db_constants.DEFAULT_END_DATE,
context=context)
db_list = list(dbs['DATABASE_NAME'])
db_dict = {db_name: DB(db_name, context=context) for db_name in db_list}
if derived:
kwargs = derived if isinstance(derived, dict) else {}
kwargs.setdefault('context', context)
db_dict.update(
derived_databases(**kwargs)
)
return db_dict
[docs]def derived_databases(
context=utils.default,
start=None, end=None,
selection_criteria='all',
db=None,
db_discovery_scope='query_host_only',
) -> Dict[str, DB]:
"""
Gets available derived databases.
Parameters
----------
context: str, optional
Context to run the query.
If not set then default :py:attr:`context<onetick.py.configuration.Config.context>` is used.
start: datetime, optional
If both ``start`` and ``end`` are set, then listing databases in this range only.
Otherwise list databases from all configured time ranges for databases.
end: datetime, optional
If both ``start`` and ``end`` are set, then listing databases in this range only.
Otherwise list databases from all configured time ranges for databases.
selection_criteria: str
Possible values: *all*, *derived_from_current_db*, *direct_children_of_current_db*.
db: str, optional
Specifies database name if ``selection_criteria`` is set to
*derived_from_current_db* or *direct_children_of_current_db*.
Must be set in this case, otherwise does nothing.
db_discovery_scope: str
When *query_host_and_all_reachable_hosts* is specified,
an attempt will be performed to get derived databases from all reachable hosts.
When *query_host_only* is specified,
only derived databases from the host on which the query is performed will be returned.
See also
--------
**SHOW_DERIVED_DB_LIST** OneTick event processor
Returns
-------
dict
Dict where keys are database names and values are :class:`DB <onetick.py.db._inspection.DB>` objects
with ``context`` specified.
"""
if start and end:
time_range = otq.ShowDerivedDbList.TimeRange.QUERY_TIME_INTERVAL
else:
# start and end times don't matter in this case, use some constants
start = db_constants.DEFAULT_START_DATE
end = db_constants.DEFAULT_END_DATE
time_range = otq.ShowDerivedDbList.TimeRange.CONFIGURED_TIME_INTERVAL
selection_criteria = getattr(otq.ShowDerivedDbList.SelectionCriteria, selection_criteria.upper())
db_discovery_scope = getattr(otq.ShowDerivedDbList.DbDiscoveryScope, db_discovery_scope.upper())
if selection_criteria != otq.ShowDerivedDbList.SelectionCriteria.ALL and not db:
raise ValueError(f"Parameter 'db' must be set when parameter 'selection_criteria' is {selection_criteria}")
ep = otq.ShowDerivedDbList(
time_range=time_range,
selection_criteria=selection_criteria,
db_discovery_scope=db_discovery_scope,
)
ep = ep.tick_type('ANY')
db = db or 'LOCAL'
dbs = otp.run(ep, symbols=f'{db}::', start=start, end=end, context=context)
db_list = list(dbs['DERIVED_DB_NAME'])
return {db_name: DB(db_name, context=context) for db_name in db_list}