importwarningsfromtypingimportDict,List,Unionfromdatetimeimportdateasdt_date,datetime,timedeltafromdateutil.tzimportgettzfromonetick.pyimportconfigurationimportonetick.pyasotpfromonetick.py.coreimportdb_constantsimportonetick.queryasotqimportpandasaspddef_datetime2date(dt:Union[dt_date,datetime])->dt_date:''' Convert datetime and date explicitly into the datetime.date '''returndt_date(dt.year,dt.month,dt.day)
[docs]classDB(object):def__init__(self,name):self.name=nameself._locator_date_ranges=Nonedef__eq__(self,obj):returnstr(self)==str(obj)def__lt__(self,obj):returnstr(self)<str(obj)def__str__(self):returnself.namedef_set_intervals(self):""" Finds all date ranges from locators. These intervals are required to find all possible dates with data. It is only possible by querying the DB_SHOW_LOADED_TIME_RANGE against the largest possible query date range. """ifself._locator_date_rangesisNone:graph=otq.GraphQuery(otq.DbShowConfiguredTimeRanges(db_name=self.name).tick_type("ANY")>>otq.Table(fields='long START_DATE, long END_DATE'))result=otp.run(graph,symbols='LOCAL::',# start and end times don't matter for this query, use some constantsstart=db_constants.DEFAULT_START_DATE,end=db_constants.DEFAULT_END_DATE,# GMT, because start/end timestamp in locator are in GMTtimezone='GMT')date_ranges=[]tz_gmt=gettz('GMT')forinxinrange(len(result)):start_date=result['START_DATE'][inx]ifstart_date<0:# On Windows datetime.fromtimestamp throws an OSError for negative valuesstart_date=0start=datetime.fromtimestamp(start_date/1000,tz=tz_gmt)start=start.replace(tzinfo=None)end=datetime.fromtimestamp(result['END_DATE'][inx]/1000,tz=tz_gmt)end=end.replace(tzinfo=None)date_ranges.append((start,end))# merge ranges if necessary to reduce number of queries# for `dates` property thenself._locator_date_ranges=[]start,end=None,Nonefort_start,t_endindate_ranges:ifstartisNone:start=t_startifendisNone:end=t_endelse:ift_start==end:end=t_endelse:self._locator_date_ranges.append((start,end))start,end=t_start,t_endifstartandend:self._locator_date_ranges.append((start,end))def__get_dates(self,only_last=False):''' Returns list of dates with data '''importonetick.pyasotpself._set_intervals()eps=otq.DbShowLoadedTimeRanges(use_cache=True).tick_type("ANY") \
>>otq.WhereClause(where="NUM_LOADED_PARTITIONS > 0")ifonly_last:eps=eps>>otq.LastTick()eps=eps>>otq.Table(fields='string START_DATE, string END_DATE')graph=otq.GraphQuery(eps)dates=[]forstart,endinself._locator_date_ranges:result=otp.run(graph,symbols=f'{self.name}::',start=start,end=end,timezone='GMT')# GMT works properly for locators with gap# every record contains consequent intervals of data on diskforinxinrange(len(result)):start=datetime.strptime(result['START_DATE'][inx],'%Y%m%d')end=datetime.strptime(result['END_DATE'][inx],'%Y%m%d')ifonly_last:return_datetime2date(end)whilestart<=end:dates.append(_datetime2date(start))start+=timedelta(days=1)ifonly_lastandlen(dates)==0:returnNone# no data on diskreturndates
[docs]defdates(self):''' Returns list of dates for which data is available Returns ------- ``datetime.date`` or ``None`` Returns ``None`` when there is no data in the database Examples -------- >>> some_db = otp.databases()['SOME_DB'] >>> some_db.dates() [datetime.date(2003, 12, 1)] '''returnself.__get_dates()
deflast_not_empty_date(self,last_date,days_back,timezone=None,tick_type=None):""" Find first day that has data starting from `last_date` and going `days_back` number of days back. """min_locator_date=self.min_locator_date()foriinrange(days_back+1):date=_datetime2date(last_date-timedelta(days=i))ifdate<min_locator_date:breaktick_types=self.tick_types(date,timezone=timezone)iftick_typeisNoneandtick_types:returndateiftick_typeisnotNoneandtick_typeintick_types:returndatereturnNone@propertydeflast_date(self):""" The latest date on which db has data Returns ------- ``datetime.datetime`` or ``None`` Returns ``None`` when there is no data in the database Examples -------- >>> some_db = otp.databases()['SOME_DB'] >>> some_db.last_date datetime.date(2003, 12, 1) """returnself._get_last_date()def_get_last_date(self,tick_type=None):last_date=self.__get_dates(only_last=True)iflast_dateisNone:returnNone# It might happen that database loading processes is configured# to work over weekends and holidays and therefore# there are days that are configured but have no data, tick types and schema.# We want to find the closest not empty day because# we want to expose the most actual schema to end user.# For example, this is a case of OneTick Cloud NYSE_TAQ database.# We only scan 5 previous days to cover weekends + possible conjuncted holidays.# According to the official NYSE calendar there are no more than 5 closed days.date=self.last_not_empty_date(last_date,days_back=5,tick_type=tick_type)ifdateisNone:warnings.warn("Can't find not empty day for the last 5 days, using last configured day. ""Try to use .last_not_empty_date() function to find older not empty days.")returnlast_datereturndate
[docs]deftick_types(self,date=None,timezone=None)->List[str]:''' Returns list of tick types for the ``date``. Parameters ---------- date: :class:`otp.dt <onetick.py.datetime>`, ``datetime.datetime``, optional Date for the tick types look up. ``None`` means the :attr:`last_date` timezone: str, optional Timezone for the look up. ``None`` means the default timezone. Returns ------- list List with string values of available tick types. Examples -------- >>> nyse_taq_db = otp.databases()['NYSE_TAQ'] >>> nyse_taq_db.tick_types(date=otp.dt(2022, 3, 1)) ['QTE', 'TRD'] '''importonetick.pyasotpdate=self.last_dateifdateisNoneelsedateiftimezoneisNone:timezone=configuration.config.tztime_params={}ifdateisnotNone:time_params["start"]=datetime_params["end"]=date+timedelta(days=1)# PY-458: don't use cache, it can return different result in some casesresult=otp.run(otq.DbShowTickTypes(use_cache=False,show_schema=False,include_memdb=True),symbols=f'{self.name}::',**time_params,timezone=timezone)iflen(result)==0:return[]returnresult['TICK_TYPE_NAME'].tolist()
[docs]defschema(self,date=None,tick_type=None,timezone=None)->Dict[str,type]:''' Gets the schema of the database. Parameters ---------- date: :class:`otp.dt <onetick.py.datetime>`, ``datetime.datetime``, optional Date for the schema. ``None`` means the :attr:`last_date` tick_type: str, optional Specifies a tick type for schema. ``None`` means use the one available tick type, if there are multiple tick types then it raises the ``Exception``. It uses the :meth:`tick_types` method. timezone: str, optional Allows to specify a timezone for searching tick types. Returns ------- dict Dict where keys are field names and values are ``onetick.py`` :ref:`types <schema concept>`. It's compatible with the :attr:`onetick.py.Source.schema` methods. Examples -------- >>> nyse_taq_db = otp.databases()['NYSE_TAQ'] >>> nyse_taq_db.schema(tick_type='TRD', date=otp.dt(2022, 3, 1)) {'PRICE': <class 'float'>, 'SIZE': <class 'int'>} '''importonetick.pyasotporig_date=dateifdateisNone:date=self._get_last_date(tick_type=tick_type)iftimezoneisNone:timezone=configuration.config.tziftick_typeisNone:tick_types=self.tick_types(date=date,timezone=timezone)iflen(tick_types)==0:raiseException("No tick types has found and specified")iflen(tick_types)>1:raiseException("Database has multiple tick types, please specify using the `tick_type` parameter")tick_type=tick_types[0]ifdateisNone:# it might happen when a database has no data on disksreturn{}# Convert explicitly into the datetime.date, because min_date and date# could be date or datetime types, and datetime is not comparable with datetime.datedate=_datetime2date(date)defget_schema(use_cache:bool=True):returnotp.run(otq.DbShowTickTypes(use_cache=use_cache,show_schema=True,include_memdb=True)>>otq.WhereClause(where=f'TICK_TYPE_NAME="{tick_type}"'),symbols=f'{self.name}::',start=date,end=date+timedelta(days=1),timezone=timezone)result=get_schema(use_cache=True)ifnotlen(result):# in case cache settings in database are bad (e.g. BEXRTS-1220)result=get_schema(use_cache=False)iflen(result):# filter schema by datedate_to_filter=Noneiforig_date:# if date is passed as a parameter -- then use itdate_to_filter=dateelse:# otherwise use the closest datedate_to_filter=result['Time'].max()result=result[(result['Time']>=pd.Timestamp(date_to_filter))]fields=zip(result['FIELD_NAME'].tolist(),result['FIELD_TYPE_NAME'].tolist(),result['FIELD_SIZE'].tolist())else:fields=[]schema={}forfname,ftype,fsizeinfields:dtype=Noneif'UINT32'inftype:dtype=otp.uintelif'UINT64'inftype:dtype=otp.ulongelif'INT8'inftype:dtype=otp.byteelif'INT16'inftype:dtype=otp.shortelif'INT'inftype:dtype=intelif'MSEC'inftype:dtype=otp.msectimeelif'NSEC'inftype:dtype=otp.nsectimeelif'DOUBLE'inftypeor'FLOAT'inftype:dtype=floatelif'DECIMAL'inftype:dtype=otp.decimalelif'VARSTRING'inftype:dtype=otp.varstringelif'STRING'inftype:iffsize==64:dtype=strelse:dtype=otp.string[fsize]else:warnings.warn(f"Unsupported field type '{ftype}' for field '{fname}'. ""Note that this field will be ignored ""and will not be added to the python schema, ""but will still remain in the OneTick schema.")continueschema[fname]=dtypereturnschema
[docs]defsymbols(self,date=None,timezone=None,tick_type=None,pattern='.*')->List[str]:''' Finds a list of available symbols in the database Parameters ---------- date: :class:`otp.dt <onetick.py.datetime>`, ``datetime.datetime``, optional Date for the symbols look up. ``None`` means the :attr:`last_date` tick_type: str, optional Tick type for symbols. ``None`` means union across all tick types. timezone: str, optional Timezone for the lookup. ``None`` means the default timezone. pattern: str Regular expression to select symbols. Examples -------- >>> nyse_taq_db = otp.databases()['NYSE_TAQ'] >>> nyse_taq_db.symbols(date=otp.dt(2022, 3, 1), tick_type='TRD', pattern='^AAP.*') ['AAP', 'AAPL'] '''importonetick.pyasotpifdateisNone:date=self.last_dateiftimezoneisNone:timezone=configuration.config.tziftick_typeisNone:tick_type=''eps=otq.FindDbSymbols(pattern='%',tick_type_field=tick_type) \
>>otq.AddField(field='SYMBOL',value='token(SYMBOL_NAME, -1, ":")') \
>>otq.WhereClause(where=f'regex_match(SYMBOL, "{pattern}")') \
>>otq.Table(fields='string SYMBOL')result=otp.run(eps,symbols=f'{self.name}::',start=date,end=date+timedelta(days=1),timezone=timezone)iflen(result)==0:return[]returnresult['SYMBOL'].tolist()
[docs]defdatabases(context=None,derived=False)->Dict[str,DB]:""" Gets all available databases in the ``context`` Parameters ---------- context: str, optional If it is not set then the default context from config is used derived: bool, dict If False then derived databases are not returned. If dict then its items used as parameters to :py:func:`~onetick.py.derived_databases` If True then default parameters for :py:func:`~onetick.py.derived_databases` are used. See also -------- | **SHOW_DB_LIST** OneTick event processor | :py:func:`derived_databases` Returns ------- dict Dict where keys are database names and values are :class:`DB <onetick.py.db._inspection.DB>` class. """ifcontextisNone:context=configuration.config.contextdbs=otp.run(otq.ShowDbList().tick_type("ANY"),symbols='LOCAL::',# start and end times don't matter for this query, use some constantsstart=db_constants.DEFAULT_START_DATE,end=db_constants.DEFAULT_END_DATE,context=context)db_list=list(dbs['DATABASE_NAME'])db_dict={db_name:DB(db_name)fordb_nameindb_list}ifderived:kwargs=derivedifisinstance(derived,dict)else{}kwargs.setdefault('context',context)db_dict.update(derived_databases(**kwargs))returndb_dict
[docs]defderived_databases(context=None,start=None,end=None,selection_criteria='all',db=None,db_discovery_scope='query_host_only',)->Dict[str,DB]:""" Gets available derived databases. Parameters ---------- context: str, optional If it is not set then the default context from config is used. start: datetime, optional If both ``start`` and ``end`` are set, then listing databases in this range only. Otherwise list databases from all configured time ranges for databases. end: datetime, optional If both ``start`` and ``end`` are set, then listing databases in this range only. Otherwise list databases from all configured time ranges for databases. selection_criteria: str Possible values: *all*, *derived_from_current_db*, *direct_children_of_current_db*. db: str, optional Specifies database name if ``selection_criteria`` is set to *derived_from_current_db* or *direct_children_of_current_db*. Must be set in this case, otherwise does nothing. db_discovery_scope: str When *query_host_and_all_reachable_hosts* is specified, an attempt will be performed to get derived databases from all reachable hosts. When *query_host_only* is specified, only derived databases from the host on which the query is performed will be returned. See also -------- **SHOW_DERIVED_DB_LIST** OneTick event processor Returns ------- dict Dict where keys are database names and values are :class:`DB <onetick.py.db._inspection.DB>` class. """ifcontextisNone:context=configuration.config.contextifstartandend:time_range=otq.ShowDerivedDbList.TimeRange.QUERY_TIME_INTERVALelse:# start and end times don't matter in this case, use some constantsstart=db_constants.DEFAULT_START_DATEend=db_constants.DEFAULT_END_DATEtime_range=otq.ShowDerivedDbList.TimeRange.CONFIGURED_TIME_INTERVALselection_criteria=getattr(otq.ShowDerivedDbList.SelectionCriteria,selection_criteria.upper())db_discovery_scope=getattr(otq.ShowDerivedDbList.DbDiscoveryScope,db_discovery_scope.upper())ifselection_criteria!=otq.ShowDerivedDbList.SelectionCriteria.ALLandnotdb:raiseValueError(f"Parameter 'db' must be set when parameter 'selection_criteria' is {selection_criteria}")ep=otq.ShowDerivedDbList(time_range=time_range,selection_criteria=selection_criteria,db_discovery_scope=db_discovery_scope,)ep=ep.tick_type('ANY')db=dbor'LOCAL'dbs=otp.run(ep,symbols=f'{db}::',start=start,end=end,context=context)db_list=list(dbs['DERIVED_DB_NAME'])return{db_name:DB(db_name)fordb_nameindb_list}