Source code for onetick.py.db._inspection

import warnings
from typing import Dict, List, Union

from datetime import date as dt_date, datetime, timedelta
from dateutil.tz import gettz
from onetick.py import configuration

import onetick.py as otp
from onetick.py.core import db_constants
import onetick.query as otq
import pandas as pd


def _datetime2date(dt: Union[dt_date, datetime]) -> dt_date:
    ''' Convert datetime and date explicitly into the datetime.date '''
    return dt_date(dt.year, dt.month, dt.day)


[docs]class DB(object): def __init__(self, name): self.name = name self._locator_date_ranges = None def __eq__(self, obj): return str(self) == str(obj) def __lt__(self, obj): return str(self) < str(obj) def __str__(self): return self.name def _set_intervals(self): """ Finds all date ranges from locators. These intervals are required to find all possible dates with data. It is only possible by querying the DB_SHOW_LOADED_TIME_RANGE against the largest possible query date range. """ if self._locator_date_ranges is None: graph = otq.GraphQuery(otq.DbShowConfiguredTimeRanges(db_name=self.name).tick_type("ANY") >> otq.Table(fields='long START_DATE, long END_DATE')) result = otp.run(graph, symbols=f'{self.name}::', # start and end times don't matter for this query, use some constants start=db_constants.DEFAULT_START_DATE, end=db_constants.DEFAULT_END_DATE, # GMT, because start/end timestamp in locator are in GMT timezone='GMT') date_ranges = [] tz_gmt = gettz('GMT') for inx in range(len(result)): start_date = result['START_DATE'][inx] if start_date < 0: # On Windows datetime.fromtimestamp throws an OSError for negative values start_date = 0 start = datetime.fromtimestamp(start_date / 1000, tz=tz_gmt) start = start.replace(tzinfo=None) end = datetime.fromtimestamp(result['END_DATE'][inx] / 1000, tz=tz_gmt) end = end.replace(tzinfo=None) date_ranges.append((start, end)) # merge ranges if necessary to reduce number of queries # for `dates` property then self._locator_date_ranges = [] start, end = None, None for t_start, t_end in date_ranges: if start is None: start = t_start if end is None: end = t_end else: if t_start == end: end = t_end else: self._locator_date_ranges.append((start, end)) start, end = t_start, t_end if start and end: self._locator_date_ranges.append((start, end)) def __get_dates(self, only_last=False): ''' Returns list of dates with data ''' import onetick.py as otp self._set_intervals() eps = otq.DbShowLoadedTimeRanges(use_cache=True).tick_type("ANY") \ >> otq.WhereClause(where="NUM_LOADED_PARTITIONS > 0") if only_last: eps = eps >> otq.LastTick() eps = eps >> otq.Table(fields='string START_DATE, string END_DATE') graph = otq.GraphQuery(eps) dates = [] for start, end in self._locator_date_ranges: result = otp.run(graph, symbols=f'{self.name}::', start=start, end=end, timezone='GMT') # GMT works properly for locators with gap # every record contains consequent intervals of data on disk for inx in range(len(result)): start = datetime.strptime(result['START_DATE'][inx], '%Y%m%d') end = datetime.strptime(result['END_DATE'][inx], '%Y%m%d') if only_last: return _datetime2date(end) while start <= end: dates.append(_datetime2date(start)) start += timedelta(days=1) if only_last and len(dates) == 0: return None # no data on disk return dates
[docs] def dates(self): ''' Returns list of dates for which data is available Returns ------- ``datetime.date`` or ``None`` Returns ``None`` when there is no data in the database Examples -------- >>> some_db = otp.databases()['SOME_DB'] >>> some_db.dates() [datetime.date(2003, 12, 1)] ''' return self.__get_dates()
def last_not_empty_date(self, last_date, days_back, timezone=None, tick_type=None): """ Find first day that has data starting from `last_date` and going `days_back` number of days back. """ min_locator_date = self.min_locator_date() for i in range(days_back + 1): date = _datetime2date(last_date - timedelta(days=i)) if date < min_locator_date: break tick_types = self.tick_types(date, timezone=timezone) if tick_type is None and tick_types: return date if tick_type is not None and tick_type in tick_types: return date return None @property def last_date(self): """ The latest date on which db has data Returns ------- ``datetime.datetime`` or ``None`` Returns ``None`` when there is no data in the database Examples -------- >>> some_db = otp.databases()['SOME_DB'] >>> some_db.last_date datetime.date(2003, 12, 1) """ return self._get_last_date() def _get_last_date(self, tick_type=None): last_date = self.__get_dates(only_last=True) if last_date is None: return None # It might happen that database loading processes is configured # to work over weekends and holidays and therefore # there are days that are configured but have no data, tick types and schema. # We want to find the closest not empty day because # we want to expose the most actual schema to end user. # For example, this is a case of OneTick Cloud NYSE_TAQ database. # We only scan 5 previous days to cover weekends + possible conjuncted holidays. # According to the official NYSE calendar there are no more than 5 closed days. date = self.last_not_empty_date(last_date, days_back=5, tick_type=tick_type) if date is None: warnings.warn( "Can't find not empty day for the last 5 days, using last configured day. " "Try to use .last_not_empty_date() function to find older not empty days." ) return last_date return date
[docs] def tick_types(self, date=None, timezone=None) -> List[str]: ''' Returns list of tick types for the ``date``. Parameters ---------- date: :class:`otp.dt <onetick.py.datetime>`, ``datetime.datetime``, optional Date for the tick types look up. ``None`` means the :attr:`last_date` timezone: str, optional Timezone for the look up. ``None`` means the default timezone. Returns ------- list List with string values of available tick types. Examples -------- >>> nyse_taq_db = otp.databases()['NYSE_TAQ'] >>> nyse_taq_db.tick_types(date=otp.dt(2022, 3, 1)) ['QTE', 'TRD'] ''' import onetick.py as otp date = self.last_date if date is None else date if timezone is None: timezone = configuration.config.tz time_params = {} if date is not None: time_params["start"] = date time_params["end"] = date + timedelta(days=1) # PY-458: don't use cache, it can return different result in some cases result = otp.run(otq.DbShowTickTypes(use_cache=False, show_schema=False, include_memdb=True), symbols=f'{self.name}::', **time_params, timezone=timezone) if len(result) == 0: return [] return result['TICK_TYPE_NAME'].tolist()
def min_locator_date(self): self._set_intervals() min_date = min(obj[0] for obj in self._locator_date_ranges) return _datetime2date(min_date)
[docs] def schema(self, date=None, tick_type=None, timezone=None) -> Dict[str, type]: ''' Gets the schema of the database. Parameters ---------- date: :class:`otp.dt <onetick.py.datetime>`, ``datetime.datetime``, optional Date for the schema. ``None`` means the :attr:`last_date` tick_type: str, optional Specifies a tick type for schema. ``None`` means use the one available tick type, if there are multiple tick types then it raises the ``Exception``. It uses the :meth:`tick_types` method. timezone: str, optional Allows to specify a timezone for searching tick types. Returns ------- dict Dict where keys are field names and values are ``onetick.py`` :ref:`types <schema concept>`. It's compatible with the :attr:`onetick.py.Source.schema` methods. Examples -------- >>> nyse_taq_db = otp.databases()['NYSE_TAQ'] >>> nyse_taq_db.schema(tick_type='TRD', date=otp.dt(2022, 3, 1)) {'PRICE': <class 'float'>, 'SIZE': <class 'int'>} ''' import onetick.py as otp orig_date = date if date is None: date = self._get_last_date(tick_type=tick_type) if timezone is None: timezone = configuration.config.tz if tick_type is None: tick_types = self.tick_types(date=date, timezone=timezone) if len(tick_types) == 0: raise Exception("No tick types has found and specified") if len(tick_types) > 1: raise Exception("Database has multiple tick types, please specify using the `tick_type` parameter") tick_type = tick_types[0] if date is None: # it might happen when a database has no data on disks return {} # Convert explicitly into the datetime.date, because min_date and date # could be date or datetime types, and datetime is not comparable with datetime.date date = _datetime2date(date) def get_schema(use_cache: bool = True): return otp.run(otq.DbShowTickTypes(use_cache=use_cache, show_schema=True, include_memdb=True) >> otq.WhereClause(where=f'TICK_TYPE_NAME="{tick_type}"'), symbols=f'{self.name}::', start=date, end=date + timedelta(days=1), timezone=timezone) result = get_schema(use_cache=True) if not len(result): # in case cache settings in database are bad (e.g. BEXRTS-1220) result = get_schema(use_cache=False) if len(result): # filter schema by date date_to_filter = None if orig_date: # if date is passed as a parameter -- then use it date_to_filter = date else: # otherwise use the closest date date_to_filter = result['Time'].max() result = result[(result['Time'] >= pd.Timestamp(date_to_filter))] fields = zip(result['FIELD_NAME'].tolist(), result['FIELD_TYPE_NAME'].tolist(), result['FIELD_SIZE'].tolist()) else: fields = [] schema = {} for fname, ftype, fsize in fields: dtype = None if 'UINT32' in ftype: dtype = otp.uint elif 'UINT64' in ftype: dtype = otp.ulong elif 'INT8' in ftype: dtype = otp.byte elif 'INT16' in ftype: dtype = otp.short elif 'INT' in ftype: dtype = int elif 'MSEC' in ftype: dtype = otp.msectime elif 'NSEC' in ftype: dtype = otp.nsectime elif 'DOUBLE' in ftype or 'FLOAT' in ftype: dtype = float elif 'DECIMAL' in ftype: dtype = otp.decimal elif 'VARSTRING' in ftype: dtype = otp.varstring elif 'STRING' in ftype: if fsize == 64: dtype = str else: dtype = otp.string[fsize] else: warnings.warn( f"Unsupported field type '{ftype}' for field '{fname}'. " "Note that this field will be ignored " "and will not be added to the python schema, " "but will still remain in the OneTick schema." ) continue schema[fname] = dtype return schema
[docs] def symbols(self, date=None, timezone=None, tick_type=None, pattern='.*') -> List[str]: ''' Finds a list of available symbols in the database Parameters ---------- date: :class:`otp.dt <onetick.py.datetime>`, ``datetime.datetime``, optional Date for the symbols look up. ``None`` means the :attr:`last_date` tick_type: str, optional Tick type for symbols. ``None`` means union across all tick types. timezone: str, optional Timezone for the lookup. ``None`` means the default timezone. pattern: str Regular expression to select symbols. Examples -------- >>> nyse_taq_db = otp.databases()['NYSE_TAQ'] >>> nyse_taq_db.symbols(date=otp.dt(2022, 3, 1), tick_type='TRD', pattern='^AAP.*') ['AAP', 'AAPL'] ''' import onetick.py as otp if date is None: date = self.last_date if timezone is None: timezone = configuration.config.tz if tick_type is None: tick_type = '' eps = otq.FindDbSymbols(pattern='%', tick_type_field=tick_type) \ >> otq.AddField(field='SYMBOL', value='token(SYMBOL_NAME, -1, ":")') \ >> otq.WhereClause(where=f'regex_match(SYMBOL, "{pattern}")') \ >> otq.Table(fields='string SYMBOL') result = otp.run(eps, symbols=f'{self.name}::', start=date, end=date + timedelta(days=1), timezone=timezone) if len(result) == 0: return [] return result['SYMBOL'].tolist()
[docs]def databases(context=None, derived=False) -> Dict[str, DB]: """ Gets all available databases in the ``context`` Parameters ---------- context: str, optional If it is not set then the default context from config is used derived: bool, dict If False then derived databases are not returned. If dict then its items used as parameters to :py:func:`~onetick.py.derived_databases` If True then default parameters for :py:func:`~onetick.py.derived_databases` are used. See also -------- | **SHOW_DB_LIST** OneTick event processor | :py:func:`derived_databases` Returns ------- dict Dict where keys are database names and values are :class:`DB <onetick.py.db._inspection.DB>` class. """ if context is None: context = configuration.config.context dbs = otp.run(otq.ShowDbList().tick_type("ANY"), symbols='LOCAL::', # start and end times don't matter for this query, use some constants start=db_constants.DEFAULT_START_DATE, end=db_constants.DEFAULT_END_DATE, context=context) db_list = list(dbs['DATABASE_NAME']) db_dict = {db_name: DB(db_name) for db_name in db_list} if derived: kwargs = derived if isinstance(derived, dict) else {} kwargs.setdefault('context', context) db_dict.update( derived_databases(**kwargs) ) return db_dict
[docs]def derived_databases( context=None, start=None, end=None, selection_criteria='all', db=None, db_discovery_scope='query_host_only', ) -> Dict[str, DB]: """ Gets available derived databases. Parameters ---------- context: str, optional If it is not set then the default context from config is used. start: datetime, optional If both ``start`` and ``end`` are set, then listing databases in this range only. Otherwise list databases from all configured time ranges for databases. end: datetime, optional If both ``start`` and ``end`` are set, then listing databases in this range only. Otherwise list databases from all configured time ranges for databases. selection_criteria: str Possible values: *all*, *derived_from_current_db*, *direct_children_of_current_db*. db: str, optional Specifies database name if ``selection_criteria`` is set to *derived_from_current_db* or *direct_children_of_current_db*. Must be set in this case, otherwise does nothing. db_discovery_scope: str When *query_host_and_all_reachable_hosts* is specified, an attempt will be performed to get derived databases from all reachable hosts. When *query_host_only* is specified, only derived databases from the host on which the query is performed will be returned. See also -------- **SHOW_DERIVED_DB_LIST** OneTick event processor Returns ------- dict Dict where keys are database names and values are :class:`DB <onetick.py.db._inspection.DB>` class. """ if context is None: context = configuration.config.context if start and end: time_range = otq.ShowDerivedDbList.TimeRange.QUERY_TIME_INTERVAL else: # start and end times don't matter in this case, use some constants start = db_constants.DEFAULT_START_DATE end = db_constants.DEFAULT_END_DATE time_range = otq.ShowDerivedDbList.TimeRange.CONFIGURED_TIME_INTERVAL selection_criteria = getattr(otq.ShowDerivedDbList.SelectionCriteria, selection_criteria.upper()) db_discovery_scope = getattr(otq.ShowDerivedDbList.DbDiscoveryScope, db_discovery_scope.upper()) if selection_criteria != otq.ShowDerivedDbList.SelectionCriteria.ALL and not db: raise ValueError(f"Parameter 'db' must be set when parameter 'selection_criteria' is {selection_criteria}") ep = otq.ShowDerivedDbList( time_range=time_range, selection_criteria=selection_criteria, db_discovery_scope=db_discovery_scope, ) ep = ep.tick_type('ANY') db = db or 'LOCAL' dbs = otp.run(ep, symbols=f'{db}::', start=start, end=end, context=context) db_list = list(dbs['DERIVED_DB_NAME']) return {db_name: DB(db_name) for db_name in db_list}