import warnings
from typing import Dict, List, Union
from datetime import date as dt_date, datetime, timedelta
from dateutil.tz import gettz
from onetick.py import configuration
import onetick.py as otp
from onetick.py.core import db_constants
import onetick.query as otq
import pandas as pd
def _datetime2date(dt: Union[dt_date, datetime]) -> dt_date:
''' Convert datetime and date explicitly into the datetime.date '''
return dt_date(dt.year, dt.month, dt.day)
[docs]class DB(object):
def __init__(self, name):
self.name = name
self._locator_date_ranges = None
def __eq__(self, obj):
return str(self) == str(obj)
def __lt__(self, obj):
return str(self) < str(obj)
def __str__(self):
return self.name
def _set_intervals(self):
"""
Finds all date ranges from locators.
These intervals are required to find all possible dates with data.
It is only possible by querying the DB_SHOW_LOADED_TIME_RANGE
against the largest possible query date range.
"""
if self._locator_date_ranges is None:
graph = otq.GraphQuery(otq.DbShowConfiguredTimeRanges(db_name=self.name).tick_type("ANY")
>> otq.Table(fields='long START_DATE, long END_DATE'))
result = otp.run(graph,
symbols='LOCAL::',
# start and end times don't matter for this query, use some constants
start=db_constants.DEFAULT_START_DATE,
end=db_constants.DEFAULT_END_DATE,
# GMT, because start/end timestamp in locator are in GMT
timezone='GMT')
date_ranges = []
tz_gmt = gettz('GMT')
for inx in range(len(result)):
start_date = result['START_DATE'][inx]
if start_date < 0:
# On Windows datetime.fromtimestamp throws an OSError for negative values
start_date = 0
start = datetime.fromtimestamp(start_date / 1000, tz=tz_gmt)
start = start.replace(tzinfo=None)
end = datetime.fromtimestamp(result['END_DATE'][inx] / 1000, tz=tz_gmt)
end = end.replace(tzinfo=None)
date_ranges.append((start, end))
# merge ranges if necessary to reduce number of queries
# for `dates` property then
self._locator_date_ranges = []
start, end = None, None
for t_start, t_end in date_ranges:
if start is None:
start = t_start
if end is None:
end = t_end
else:
if t_start == end:
end = t_end
else:
self._locator_date_ranges.append((start, end))
start, end = t_start, t_end
if start and end:
self._locator_date_ranges.append((start, end))
def __get_dates(self, only_last=False):
''' Returns list of dates with data '''
import onetick.py as otp
self._set_intervals()
eps = otq.DbShowLoadedTimeRanges(use_cache=True).tick_type("ANY") \
>> otq.WhereClause(where="NUM_LOADED_PARTITIONS > 0")
if only_last:
eps = eps >> otq.LastTick()
eps = eps >> otq.Table(fields='string START_DATE, string END_DATE')
graph = otq.GraphQuery(eps)
dates = []
for start, end in self._locator_date_ranges:
result = otp.run(graph,
symbols=f'{self.name}::',
start=start,
end=end,
timezone='GMT') # GMT works properly for locators with gap
# every record contains consequent intervals of data on disk
for inx in range(len(result)):
start = datetime.strptime(result['START_DATE'][inx], '%Y%m%d')
end = datetime.strptime(result['END_DATE'][inx], '%Y%m%d')
if only_last:
return _datetime2date(end)
while start <= end:
dates.append(_datetime2date(start))
start += timedelta(days=1)
if only_last and len(dates) == 0:
return None # no data on disk
return dates
[docs] def dates(self):
''' Returns list of dates for which data is available
Returns
-------
``datetime.date`` or ``None``
Retruns ``None`` when there is no data in the database
Examples
--------
>>> some_db = otp.databases()['SOME_DB']
>>> some_db.dates()
[datetime.date(2003, 12, 1)]
'''
return self.__get_dates()
def last_not_empty_date(self, last_date, days_back, timezone=None):
"""
Find first day that has data
starting from `last_date` and going `days_back` number of days back.
"""
for i in range(days_back + 1):
date = last_date - timedelta(days=i)
if self.tick_types(date, timezone=timezone):
return date
return None
@property
def last_date(self):
''' The latest date on which db has data
Returns
-------
``datetime.datetime`` or ``None``
Returns ``None`` when there is no data in the database
Examples
--------
>>> some_db = otp.databases()['SOME_DB']
>>> some_db.last_date
datetime.date(2003, 12, 1)
'''
last_date = self.__get_dates(only_last=True)
if last_date is None:
return None
# It might happen that database loading processes is configured
# to work over weekends and holidays and therefore
# there are days that are configured but have no data, tick types and schema.
# We want to find the closest not empty day because
# we want to expose the most actual schema to end user.
# For example, this is a case of OneTick Cloud NYSE_TAQ database.
# We only scan 5 previous days to cover weekends + possible conjuncted holidays.
# According to the official NYSE calendar there are no more than 5 closed days.
date = self.last_not_empty_date(last_date, days_back=5)
if date is None:
warnings.warn(
"Can't find not empty day for the last 5 days, using last configured day. "
"Try to use .last_not_empty_date() function to find older not empty days."
)
return last_date
return date
[docs] def tick_types(self, date=None, timezone=None) -> List[str]:
''' Returns list of tick types for the ``date``.
Parameters
----------
date: :class:`otp.dt <onetick.py.datetime>`, ``datetime.datetime``, optional
Date for the tick types look up. ``None`` means the :attr:`last_date`
timezone: str, optional
Timezone for the look up. ``None`` means the default timezeone.
Returns
-------
list
List with string values of available tick types.
Examples
--------
>>> nyse_taq_db = otp.databases()['NYSE_TAQ']
>>> nyse_taq_db.tick_types(date=otp.dt(2022, 3, 1))
['QTE', 'TRD']
'''
import onetick.py as otp
date = self.last_date if date is None else date
if timezone is None:
timezone = configuration.config.tz
time_params = {}
if date is not None:
time_params["start"] = date
time_params["end"] = date + timedelta(days=1)
# PY-458: don't use cache, it can return different result in some cases
result = otp.run(otq.DbShowTickTypes(use_cache=False,
show_schema=False,
include_memdb=False),
symbols=f'{self.name}::',
**time_params,
timezone=timezone)
if len(result) == 0:
return []
return result['TICK_TYPE_NAME'].tolist()
[docs] def schema(self, date=None, tick_type=None, timezone=None) -> Dict[str, type]:
''' Gets the schema of the database.
Parameters
----------
date: :class:`otp.dt <onetick.py.datetime>`, ``datetime.datetime``, optional
Date for the schema. ``None`` means the :attr:`last_date`
tick_type: str, optional
Specifies a tick type for schema. ``None`` means use the one available
tick type, if there are multiple tick types then it raises the ``Exception``.
It uses the :meth:`tick_types` method.
timezone: str, optional
Allows to specify a timezone for searching tick types.
Returns
-------
dict
Dict where keys are field names and values are ``onetick.py`` :ref:`types <schema concept>`.
It's compatible with the :attr:`onetick.py.Source.schema` methods.
Examples
--------
>>> nyse_taq_db = otp.databases()['NYSE_TAQ']
>>> nyse_taq_db.schema(tick_type='TRD', date=otp.dt(2022, 3, 1))
{'PRICE': <class 'float'>, 'SIZE': <class 'int'>}
'''
import onetick.py as otp
orig_date = date
if date is None:
date = self.last_date
if timezone is None:
timezone = configuration.config.tz
if tick_type is None:
tick_types = self.tick_types(date=date, timezone=timezone)
if len(tick_types) == 0:
raise Exception("No tick types has found and specified")
if len(tick_types) > 1:
raise Exception("Database has multiple tick types, please specify using the `tick_type` parameter")
tick_type = tick_types[0]
if date is None:
# it might happen when a database has no data on disks
return {}
if self._locator_date_ranges is None:
# if locator date ranges are not populated then set them
self._set_intervals()
min_locator_date = min(obj[0] for obj in self._locator_date_ranges)
# Convert explicitly into the datetime.date, because min_date and date
# could be date or datetime types, and datetime is not comparable with datetime.date
date = _datetime2date(date)
min_locator_date = _datetime2date(min_locator_date)
def get_schema(use_cache: bool = True):
return otp.run(otq.DbShowTickTypes(use_cache=use_cache,
show_schema=True,
include_memdb=True)
>> otq.WhereClause(where=f'TICK_TYPE_NAME="{tick_type}"'),
symbols=f'{self.name}::',
start=date,
end=date + timedelta(days=1),
timezone=timezone)
result = get_schema(use_cache=True)
if not len(result):
# in case cache settings in database are bad (e.g. BEXRTS-1220)
result = get_schema(use_cache=False)
if len(result):
# filter schema by date
date_to_filter = None
if orig_date:
# if date is passed as a parameter -- then use it
date_to_filter = date
else:
# otherwise use the closest date
date_to_filter = result['Time'].max()
result = result[(result['Time'] >= pd.Timestamp(date_to_filter))]
fields = zip(result['FIELD_NAME'].tolist(),
result['FIELD_TYPE_NAME'].tolist(),
result['FIELD_SIZE'].tolist())
else:
fields = []
schema = {}
for fname, ftype, fsize in fields:
dtype = None
if 'INT' in ftype:
dtype = int
elif 'MSEC' in ftype:
dtype = otp.msectime
elif 'NSEC' in ftype:
dtype = otp.nsectime
elif 'DOUBLE' in ftype or 'FLOAT' in ftype:
dtype = float
elif 'VARSTRING' in ftype:
dtype = otp.varstring
elif 'STRING' in ftype:
if fsize == 64:
dtype = str
else:
dtype = otp.string[fsize]
else:
warnings.warn(
f"Unsupported field type '{ftype}' for field '{fname}'. "
"Note that this field will be ignored "
"and will not be added to the python schema, "
"but will still remain in the OneTick schema."
)
continue
schema[fname] = dtype
return schema
[docs] def symbols(self, date=None, timezone=None, tick_type=None, pattern='.*') -> List[str]:
''' Finds a list of available symbols in the database
Parameters
----------
date: :class:`otp.dt <onetick.py.datetime>`, ``datetime.datetime``, optional
Date for the symbols look up. ``None`` means the :attr:`last_date`
tick_type: str, optional
Tick type for symbols. ``None`` means union across all tick types.
timezone: str, optional
Timezone for the lookup. ``None`` means the default timezone.
pattern: str
Regular expression to select symbols.
Examples
--------
>>> nyse_taq_db = otp.databases()['NYSE_TAQ']
>>> nyse_taq_db.symbols(date=otp.dt(2022, 3, 1), tick_type='TRD', pattern='^AAP.*')
['AAP', 'AAPL']
'''
import onetick.py as otp
if date is None:
date = self.last_date
if timezone is None:
timezone = configuration.config.tz
if tick_type is None:
tick_type = ''
eps = otq.FindDbSymbols(pattern='%', tick_type_field=tick_type) \
>> otq.AddField(field='SYMBOL', value='token(SYMBOL_NAME, -1, ":")') \
>> otq.WhereClause(where=f'regex_match(SYMBOL, "{pattern}")') \
>> otq.Table(fields='string SYMBOL')
result = otp.run(eps,
symbols=f'{self.name}::',
start=date,
end=date + timedelta(days=1),
timezone=timezone)
if len(result) == 0:
return []
return result['SYMBOL'].tolist()
[docs]def databases(context=None) -> Dict[str, DB]:
"""
Gets all available databases in the ``context``
Parameters
----------
context: str, optional
If it is not set then the default context from config is used
Returns
-------
dict
Dict where keys are database names and values are :class:`DB <onetick.py.db._inspection.DB>` class.
"""
if context is None:
context = configuration.config.context
dbs = otp.run(otq.ShowDbList().tick_type("ANY"),
symbols='LOCAL::',
# start and end times don't matter for this query, use some constants
start=db_constants.DEFAULT_START_DATE,
end=db_constants.DEFAULT_END_DATE,
context=context)
db_list = list(dbs['DATABASE_NAME'])
return {db_name: DB(db_name) for db_name in db_list}