import os
import warnings
import shutil
from datetime import datetime
from typing import List
from locator_parser.io import FileReader, FileWriter, PrintWriter
from locator_parser.actions import Add, GetAll, Delete
from locator_parser.common import apply_actions
from locator_parser import locator as _locator
from locator_parser import acl as _acl
from abc import ABC, abstractmethod
from onetick.lib.instance import OneTickLib
from . import utils
from . import license as _license
from . import db as _db
from . import servers as _servers
from .configuration import config
from .db._inspection import databases as _databases
class EntityOperationFailed(Exception):
"""
Raise when operation with an entity for a config
in this module failed
"""
class MultipleSessionsException(Exception):
"""
Raises when user tries to initiate a new one Session
when another one is already used
"""
def _apply_to_entity(cfg, entity, func, roll_back_func):
"""
Function generalizes operations for locators and ACLs:
tries to apply query function, and calls roll_back_func if
the query fails
"""
res = func(entity)
try:
cfg.reload()
except: # noqa
roll_back_func(entity)
raise
return res
class _FileHandler_(ABC):
def __init__(self, file_h=None, clean_up=True, copied=True, session_ref=None):
self._file = file_h
self._clean_up = clean_up
# flag to understand whether we work with externally passed files;
# it is set and affects logic, when copy=False
self._copied = copied
self._session_ref = session_ref
@property
def path(self):
return self._file.path
@property
def file(self):
return self._file
def copy(self, clean_up=True, copy=True, session_ref=None):
return self.__class__(self.path, clean_up=clean_up, copy=copy, session_ref=session_ref) # pylint: disable=E1123
@abstractmethod
def cleanup(self):
pass
@staticmethod
def _db_in_dbs_case_insensitive(db_id: str, databases: List[str]):
for db_name in databases:
if db_id.upper() == db_name.upper():
return True
return False
class _CommonBuilder_(ABC):
def __init__(self, src=None, clean_up=True, copy=True, session_ref=None):
self.src = src
self.clean_up = clean_up
self.copy = copy
self.session_ref = session_ref
@abstractmethod
def build(self):
pass
[docs]class ACL(_FileHandler_):
class User(str):
"""
Subclass represents an ACL user
"""
pass
def __init__(self, path=None, clean_up=True, copy=True, session_ref=None):
"""
Class representing OneTick database access list file.
ACL is the file that describes the list of the users
that are allowed to access the database and what permissions do they have.
Parameters
----------
path: str
A path to custom acl file. Default is `None`, that means to generate a temporary acl file.
clean_up: bool
If `True`, then temporary acl file will be removed when ACL object will be destroyed. It is
helpful for debug purpose. Default is `True`.
copy: bool
If `True`, then the passed custom acl file by the ``path`` parameter will be copied first before
usage. It might be used when you want to work with a custom acl file, but don't want to change
the original file; in that case a custom acl file will be copied into a temporary file and
every request for modification will be executed for that temporary file. Default is `True`.
"""
copied = True
# TODO: implement this logic later
# if copy is None and path is not None:
# # if copy rule is not specified, but path is specified
# # then we set copy to True with safety goal, otherwise
# # we would might change a permanent file without user
# # acknowledgment
# copy = True
# raise Warning("You set the ACL file, but have not specify a copy rule. "
# "We copy it with safety goal, and it means you will work "
# "with copied file instead of passed. If you want to work "
# "with passed file directly, then you could set the 'copy' "
# "parameter to True.")
# if path is set, then copy file, we should not work directly
# with externally passed files
if copy:
if path:
file_h = utils.TmpFile(suffix=".acl", clean_up=clean_up)
shutil.copyfile(path, file_h.path)
else:
file_h = utils.tmp_acl(clean_up=clean_up)
else:
if path:
file_h = utils.PermanentFile(path)
copied = False
else:
file_h = utils.tmp_acl(clean_up=clean_up)
assert file_h is not None
super().__init__(file_h, clean_up=clean_up, copied=copied, session_ref=session_ref)
self._added_dbs = []
def cleanup(self):
for db in self._added_dbs:
self._remove_db(db)
self._added_dbs = []
self.reload()
def _add_db(self, db):
permissions = {}
if db._write:
permissions["write_access"] = "true"
if hasattr(db, "_destroy_access") and db._destroy_access:
permissions["destroy_access"] = "true"
add_db = Add(_acl.DB(id=db.id, read_access="true"))
res = apply_actions(_acl.parse_acl, FileReader(self.path), FileWriter(self.path), [add_db], flush=True)
if not res:
return res
if permissions:
add_allow = Add(_acl.Allow(role="Admin", **permissions))
add_allow.add_where(_acl.DB, id=db.id)
return apply_actions(_acl.parse_acl, FileReader(self.path), FileWriter(self.path), [add_allow], flush=True)
return True
def _remove_db(self, db):
remove_db = Delete()
remove_db.add_where(_acl.DB, id=db.id)
return apply_actions(_acl.parse_acl, FileReader(self.path), FileWriter(self.path), [remove_db], flush=True)
def _add_user(self, user):
add_user = Add(_acl.User(name=user))
add_user.add_where(_acl.Role, name="Admin")
return apply_actions(_acl.parse_acl, FileReader(self.path), FileWriter(self.path), [add_user], flush=True)
def _remove_user(self, user):
remove_user = Delete()
remove_user.add_where(_acl.Role, name="Admin")
remove_user.add_where(_acl.User, name=user)
return apply_actions(_acl.parse_acl, FileReader(self.path), FileWriter(self.path), [remove_user], flush=True)
def add(self, entity):
"""
Add entity to the ACL and reload it. If it failes, then tries to roll basck to
the starting state
Parameters
---------
entity : DB or ACL.User
Raises
------
TypeError, EntityOperationFailed
"""
if isinstance(entity, _db.DB):
if not self._db_in_dbs_case_insensitive(entity.id, self.databases):
if not _apply_to_entity(self, entity, self._add_db, self._remove_db):
raise EntityOperationFailed(f'Adding "{entity}" db to the acl "{self.path}" has failed :(')
self._added_dbs.append(entity)
else:
pass # TODO: need to think what to do in case when we add database with the same id
elif isinstance(entity, ACL.User):
if not _apply_to_entity(self, entity, self._add_user, self._remove_user):
raise EntityOperationFailed(f'Adding "{entity}" user to the acl "{self.path}" has failed :(')
else:
raise TypeError(f'Entity of type "{type(entity)}" is not supported')
def remove(self, entity):
"""
Remove entity from the ACL and reload it. If it failes, then tries to roll back to
the starting state.
Parameters
----------
entity : DB or ACL.User
Raises
------
ValueError, TypeError, EntityOperationFailed
"""
if isinstance(entity, _db.DB):
if entity not in self._added_dbs:
raise ValueError(f'DB "{entity}" was not added')
if not _apply_to_entity(self, entity, self._remove_db, self._add_db):
raise EntityOperationFailed(f'Remove "{entity}" db from the acl "{self.path}" has failed :(')
self._added_dbs.remove(entity)
elif isinstance(entity, ACL.User):
if not _apply_to_entity(self, entity, self._remove_user, self._add_user):
raise EntityOperationFailed(f'Remove "{entity}" user from the acl "{self.path}" has failed :(')
else:
raise TypeError(f'Entity of type "{type(entity)}" is not supported')
def reload(self, db=None):
if self._session_ref is not None:
return utils.reload_config(db, config_type='ACCESS_LIST')
def _read_dbs(self):
get_db = GetAll()
get_db.add_where(_acl.DB)
apply_actions(_acl.parse_acl, FileReader(self.path), PrintWriter(), [get_db])
return list(map(lambda x: x.id, get_db.result))
def _dbs(self):
action = GetAll()
action.add_where(_acl.DB)
apply_actions(_acl.parse_acl, FileReader(self.path), PrintWriter(), [action])
return list(map(lambda x: x.id, action.result))
def _users(self):
action = GetAll()
action.add_where(_acl.Role, name="Admin")
action.add_where(_acl.User)
apply_actions(_acl.parse_acl, FileReader(self.path), PrintWriter(), [action])
return list(map(lambda x: x.name, action.result))
@property
def databases(self):
return self._dbs()
@property
def users(self):
return self._users()
class ACLBuilder(_CommonBuilder_):
def build(self):
params = {"clean_up": self.clean_up, "copy": self.copy, "session_ref": self.session_ref}
if isinstance(self.src, str):
return ACL(self.src, **params)
elif isinstance(self.src, utils.File):
return ACL(self.src.path, **params)
elif isinstance(self.src, ACL):
return self.src.copy(**params)
elif self.src is None:
return ACL(**params)
raise ValueError(f'It is not allowed to build ACL from the object of type "{type(self.src)}"')
[docs]class Locator(_FileHandler_):
def __init__(self, path=None, clean_up=True, copy=True, empty=False, session_ref=None):
"""
Class representing OneTick database locator.
Locator is the file that describes database name, location and other options.
Parameters
----------
path: str
A path to custom locator file. Default is `None`, that means to generate a temporary locator.
clean_up: bool
If True, then temporary locator will be removed when Locator object will be destroyed. It is
helpful for debug purpose. Default is `True`.
copy: bool
If `True`, then the passed custom locator by the ``path`` parameter will be copied firstly before
usage. It might be used when you want to work with a custom locator, but don't want to change
the original file; in that case a custom locator will be copied into a temporary locator and
every request for modification will be executed for that temporary locator. Default is `True`.
empty: bool
If `True`, then a temporary locator will have no databases, otherwise it will have default
otp.config.default_db, BESTEX, ALERTS, COMMON and WORKFLOW databases. Default is `False`.
"""
copied = True
# if path is set, then copy file, we should not work directly
# with externally passed files
if copy:
if path:
file_h = utils.TmpFile(".locator", clean_up=clean_up)
shutil.copyfile(path, file_h)
else:
file_h = utils.tmp_locator(clean_up=clean_up, empty=empty)
else:
if path:
file_h = utils.PermanentFile(path)
copied = False
else:
file_h = utils.tmp_locator(clean_up=clean_up, empty=empty)
assert file_h is not None
super().__init__(file_h, clean_up=clean_up, copied=copied, session_ref=session_ref)
self._added_dbs = []
self._added_ts = []
def cleanup(self):
for db in self._added_dbs:
self._remove_db(db)
for server in self._added_ts:
self._remove_ts(str(server))
self._added_dbs = []
self._added_ts = []
self.reload()
@property
def databases(self):
return self._dbs()
@property
def tick_servers(self):
return self._ts()
def reload(self, db_=None):
if self._session_ref is not None:
return utils.reload_config(db_, config_type='LOCATOR')
def _dbs(self):
action = GetAll()
action.add_where(_locator.DB)
apply_actions(_locator.parse_locator, FileReader(self.path), PrintWriter(), [action])
return list(map(lambda x: x.id, action.result))
def _ts(self):
get_ts = GetAll()
get_ts.add_where(_locator.TickServers)
get_ts.add_where(_locator.ServerLocation)
apply_actions(_locator.parse_locator, FileReader(self.path), PrintWriter(), [get_ts])
return [location.location for location in get_ts.result]
def _add_db(self, db):
res = apply_actions(
_locator.parse_locator,
FileReader(self.path),
FileWriter(self.path),
[Add(_locator.DB(id=db.id, **db.properties))],
flush=True,
)
if not res:
return res
result = True
for location in db.locations:
add_location = Add(_locator.Location(**location))
add_location.add_where(_locator.DB, id=db.id)
result &= apply_actions(
_locator.parse_locator, FileReader(self.path), FileWriter(self.path), [add_location], flush=True
)
return result
def _remove_db(self, db):
remove_db = Delete()
remove_db.add_where(_locator.DB, id=db.id)
return apply_actions(
_locator.parse_locator, FileReader(self.path), FileWriter(self.path), [remove_db], flush=True
)
def _add_ts(self, server):
"""
Add server to locator file (without reloading)
Parameters
----------
server : RemoteTS
Server to be added to locator.
"""
actions = []
if server.cep:
actions.append(Add(_locator.CEPServerLocation(location=str(server))))
else:
actions.append(Add(_locator.ServerLocation(location=str(server))))
return apply_actions(
_locator.parse_locator,
FileReader(self.path),
FileWriter(self.path),
actions,
flush=True,
)
def _remove_ts(self, server):
"""
Remove server from locator file (without reloading)
Parameters
----------
server : RemoteTS
Server to remove from locator
"""
remove_db = Delete()
remove_db.add_where(_locator.ServerLocation, location=str(server))
return apply_actions(
_locator.parse_locator, FileReader(self.path), FileWriter(self.path), [remove_db], flush=True
)
def _add_locator(self, locator):
"""
Add reference to a locator
Parameters
---------
locator : Locator
"""
return apply_actions(
_locator.parse_locator,
FileReader(self.path),
FileWriter(self.path),
[Add(_locator.Include(path=locator.path))],
flush=True,
)
def _remove_locator(self, locator):
"""
Remove reference for a locator
Parameters
----------
locator : Locator
"""
remove_locator = Delete()
remove_locator.add_where(_locator.Include, path=locator.path)
return apply_actions(
_locator.parse_locator, FileReader(self.path), FileWriter(self.path), [remove_locator], flush=True
)
def add(self, entity):
"""
Add entity to the locator and reload it. If it failes, then tries to roll back to
the starting state
Parameters
----------
entity : DB, RemoteTS or Locator
Raises
------
TypeError, EntityOperationFailed
"""
if isinstance(entity, _db.db._DB):
if not self._db_in_dbs_case_insensitive(entity.id, self.databases):
if not _apply_to_entity(self, entity, self._add_db, self._remove_db):
raise EntityOperationFailed(f'Add "{entity}" db to the locator "{self.path}" has failed :(')
self._added_dbs.append(entity)
else:
warnings.warn(f"Database {entity.id} is already used and will not be rewritten with this command. "
f"Notice that databases' names are case insensitive. ")
elif isinstance(entity, _servers.RemoteTS):
if not _apply_to_entity(self, entity, self._add_ts, self._remove_ts):
raise EntityOperationFailed(
f'Add "{entity}" remote ts into the locator "{self.path}" has failed :('
)
self._added_ts.append(entity)
elif isinstance(entity, Locator):
if not _apply_to_entity(self, entity, self._add_locator, self._remove_locator):
raise EntityOperationFailed(
f'Add "{entity}" locator reference to the locator "{self.path}" has failed :('
)
else:
raise TypeError(f'Entity of type "{type(entity)}" is not supported')
def remove(self, entity):
"""
Remove entity from the locator and reload it. If it failes, then tries to roll back to
the starting state.
Raises
------
ValueError, TypeError, EntityOperationFailed
"""
if isinstance(entity, _db.db._DB):
if entity not in self._added_dbs:
raise ValueError(f'DB "{entity}" was not added')
if not _apply_to_entity(self, entity, self._remove_db, self._add_db):
raise EntityOperationFailed(f'Remove "{entity}" db from the locator "{self.path}" has failed :(')
self._added_dbs.remove(entity)
elif isinstance(entity, _servers.RemoteTS):
if entity not in self._added_ts:
raise ValueError(f'Tick server "{entity}" was not added')
if not _apply_to_entity(self, entity, self._remove_ts, self._add_ts):
raise EntityOperationFailed(
f'Remove "{entity}" ts record from the locator "{self.path}" has failed :('
)
self._added_ts.remove(entity)
elif isinstance(entity, Locator):
if not _apply_to_entity(self, entity, self._remove_locator, self._add_locator):
raise EntityOperationFailed(
f'Remove "{entity.path}" locator reference from the locator "{self.path}" has failed :('
)
else:
raise TypeError(f'Entity of type "{type(entity)}" is not supported')
def __contains__(self, item):
if str(item) in self.databases:
return True
return False
class LocatorBuilder(_CommonBuilder_):
def build(self):
params = {"clean_up": self.clean_up, "copy": self.copy, "session_ref": self.session_ref}
if isinstance(self.src, str):
return Locator(self.src, **params)
elif isinstance(self.src, utils.File):
return Locator(self.src.path, **params)
elif isinstance(self.src, Locator):
return self.src.copy(**params)
elif isinstance(self.src, _servers.RemoteTS):
locator = Locator(empty=True, **params)
locator.add(self.src)
return locator
elif self.src is None:
return Locator(**params)
raise ValueError(f'It is not allowed to build Locator from the object of type "{type(self.src)}"')
[docs]class Config(_FileHandler_):
def __init__(
self,
config=None,
locator=None,
acl=None,
otq_path=None,
csv_path=None,
clean_up=True,
copy=True,
session_ref=None,
license=None,
):
"""
Parameters
----------
config: path or Config
Allows to specify a custom config. None is to use temporary generated config. Default is None.
locator: Locator
Allows to specify a custom locator file. None is to use temporary generated locator. Default is None.
acl: ACL
Allows to specify a custom acl file. None is to use temporary generated acl. Default is None.
otq_path: list of paths to lookup queries
OTQ_PATH parameter in the OneTick config file. Default is None, that is equal to the empty list.
csv_path: list of paths to lookup csv files
CSV_PATH parameter in the OneTick config file. Default is None, that is equal to the empty list.
clean_up: bool
If True, then temporary config file will be removed when the Config instance will be destroyed.
It is helpful for debug purpose. Default is True.
copy: bool
If True, then the passed custom config file will be copied firstly before any usage with it.
It might be used when you want to work with a custom config file, but don't want to change to
change the original file; in that case a custom config will be copied into a temporary config
file and every request for modification will be executed for that temporary config. Default
is True.
license: instance from the onetick.py.license module
License to use. If it is not set, then onetick.py.license.Default is used.
"""
if config and (locator or acl):
raise ValueError("It is not allowed to use 'config' parameter along with 'locator' or 'acl'")
# builders that construct locator and acl based on parameters
acl_builder = ACLBuilder(src=acl, clean_up=clean_up, copy=copy, session_ref=session_ref)
locator_builder = LocatorBuilder(src=locator, clean_up=clean_up, copy=copy, session_ref=session_ref)
config_copied = True
if config:
# copy passed file, we should not work with externally passed files
if copy:
self._file = utils.TmpFile(".cfg", clean_up=clean_up)
config_path = config.path if isinstance(config, Config) else config
shutil.copyfile(config_path, self._file.path)
else:
self._file = utils.PermanentFile(config)
config_copied = False
if utils.is_param_in_config(self._file.path, "ACCESS_CONTROL_FILE"):
acl_builder.src = utils.get_config_param(self._file.path, "ACCESS_CONTROL_FILE")
if utils.is_param_in_config(self._file.path, "DB_LOCATOR.DEFAULT"):
locator_builder.src = utils.get_config_param(self._file.path, "DB_LOCATOR.DEFAULT")
else:
self._file = utils.tmp_config(clean_up=clean_up)
self._acl = acl_builder.build()
os.environ["ONE_TICK_SESSION_ACL_PATH"] = self._acl.path
self._locator = locator_builder.build()
os.environ["ONE_TICK_SESSION_LOCATOR_PATH"] = self._locator.path
super().__init__(self._file, clean_up=clean_up, copied=config_copied, session_ref=session_ref)
# Here we can start to modify files - they are either copied or generated
# ------------------------------------------------------------------------
utils.modify_config_param(self.path, "ACCESS_CONTROL_FILE", self._acl.path, throw_on_missing=False)
utils.modify_config_param(self.path, "DB_LOCATOR.DEFAULT", self._locator.path, throw_on_missing=False)
if otq_path:
otq_path = map(str, otq_path)
utils.modify_config_param(self.path, "OTQ_FILE_PATH", ",".join(otq_path), throw_on_missing=False)
if csv_path:
csv_path = map(str, csv_path)
utils.modify_config_param(self.path, "CSV_FILE_PATH", ",".join(csv_path), throw_on_missing=False)
# set license
# ---------------------------
custom_license = utils.is_param_in_config(self.path, "LICENSE_REPOSITORY_DIR")
custom_license &= utils.is_param_in_config(self.path, "ONE_TICK_LICENSE_FILE")
if license:
self._license = license
else:
if custom_license:
lic_file = utils.get_config_param(self.path, "ONE_TICK_LICENSE_FILE")
lic_dir = utils.get_config_param(self.path, "LICENSE_REPOSITORY_DIR")
self._license = _license.Custom(lic_file, lic_dir)
else:
if isinstance(locator, _servers.RemoteTS):
self._license = _license.Remote()
else:
self._license = _license.Default()
if not custom_license: # no need to set already defined custom values
if self._license.dir:
utils.modify_config_param(self.path,
"LICENSE_REPOSITORY_DIR",
self._license.dir,
throw_on_missing=False)
if self._license.file:
utils.modify_config_param(self.path,
"ONE_TICK_LICENSE_FILE",
self._license.file,
throw_on_missing=False)
@property
def acl(self):
return self._acl
@property
def locator(self):
return self._locator
@property
def license(self):
return self._license
def copy(self, clean_up=True, copy=True, session_ref=None):
""" overridden version of copy """
return self.__class__(self.path, clean_up=clean_up, copy=copy, session_ref=session_ref, license=self._license)
@staticmethod
def build(obj=None, clean_up=True, copy=True, session_ref=None):
params = {"clean_up": clean_up, "copy": copy, "session_ref": session_ref}
if isinstance(obj, str):
return Config(obj, **params)
elif isinstance(obj, utils.File):
return Config(obj.path, **params)
elif isinstance(obj, Config):
return obj.copy(**params)
elif obj is None:
return Config(**params)
raise ValueError(f'It is not allowed to build Config from the object of type "{type(obj)}"')
def cleanup(self):
# no logic to clean up content
self._acl.cleanup()
self._locator.cleanup()
@property
def otq_path(self):
try:
return utils.get_config_param(self.path, "OTQ_FILE_PATH")
except AttributeError:
return None
[docs]class Session(object):
"""
A class for setting up working OneTick session. It keeps configuration files during
the session and allows to manage them. When instance is out of scope, then instance
cleans up config files and configuration.
You can leave the scope manually with method :py:meth:`close`.
Also, session is closed automatically if this object is used as a context manager.
It is allowed to have only one alive session instance in the process.
If you don't use Session's instance, then ``ONE_TICK_CONFIG`` environment variable
should be set to be able to work with OneTick.
If config file is not set then temporary is generated.
Config includes locator and acl file, and if they are not set, then they are generated.
Parameters
----------
config : str, Config, optional
Path to an existing OneTick config file; if it is not set, then config will be generated.
If config is not set, then temporary config is generated. Default is None.
clean_up : bool, optional
A flag to control cleaning up process: if it is True then all temporary generated files will
be automatically removed. It is helpful for debugging. The flag affects only generated files, but
does not externally passed. Default is True.
copy : bool, optional
A flag to control file copy process: if it is True then all externally passed files will be
copied before usage, otherwise all modifications during an existing session happen directly with
passed config files. NOTE: we suggest to set this flag only when you fully understand it's effect.
Default is True.
override_env : bool, optional
If flag is True, then unconditionally ``ONE_TICK_CONFIG`` environment variable will be overriden
with a config that belongs to a Session. Otherwise ``ONE_TICK_CONFIG``
will be defined in the scope of session only when it is not defined externally.
For example, it is helpful when you test ascii_loader that uses 'ONE_TICK_CONFIG' only.
Default is False ( default is False, because overriding external evnironment variable
might be not obvious and desirable )
redirect_logs: bool, optional
If flag is True, then OneTick logs will be redirected into a temporary log file. Otherwise
logs will be mixed with output. Default is True.
Examples
--------
If session is defined with environment, OneTick can be used right away:
>>> 'ONE_TICK_CONFIG' in os.environ
True
>>> list(otp.databases()) # doctest: +ELLIPSIS
['COMMON', 'DEMO_L1', ..., 'SOME_DB']
>>> otp.DataSource('SOME_DB', symbol='S1', tick_type='TT').to_df()
Time X
0 2003-12-01 00:00:00.000 1
1 2003-12-01 00:00:00.001 2
2 2003-12-01 00:00:00.002 3
"""
# TODO: create article for Session in Guides or Concepts
_instance = None
def __init__(self, config=None, clean_up=True, copy=True, override_env=False, redirect_logs=True):
self._construct(config, clean_up, copy, override_env, redirect_logs)
def _construct(self, config=None, clean_up=True, copy=True, override_env=False, redirect_logs=True):
if Session._instance:
raise MultipleSessionsException(
"It is forbidden to use multiple active sessions simultaniously in one process"
)
def onetick_cfg_rollback(var):
"""
function to rollback ONE_TICK_CONFIG state
"""
def _impl():
if var is None:
if "ONE_TICK_CONFIG" in os.environ:
del os.environ["ONE_TICK_CONFIG"]
else:
os.environ["ONE_TICK_CONFIG"] = var
return _impl
self._lib = None
self._env_rollback = onetick_cfg_rollback(os.environ.get("ONE_TICK_CONFIG", None))
self._override_env = override_env
self._config = Config.build(config, clean_up=clean_up, copy=copy, session_ref=self)
os.environ["ONE_TICK_SESSION_CFG_PATH"] = self._config.path
try:
if "ONE_TICK_CONFIG" not in os.environ:
os.environ["ONE_TICK_CONFIG"] = self._config.path
else:
if override_env:
os.environ["ONE_TICK_CONFIG"] = self._config.path
else:
warnings.warn(
UserWarning(
"ONE_TICK_CONFIG env variable has been set before a session, "
"and in the session scope it is not related to the session config. "
"If you want to make ONE_TICKC_CONFIG env variable be consistent "
"with the session, then look at the override_env flag "
"for the Session constructor"
)
)
OneTickLib().cleanup()
self._log_file = utils.TmpFile(suffix=".onetick.log", clean_up=clean_up)
self._lib = OneTickLib(self._config.path, log_file=self._log_file.path)
except: # noqa
self._env_rollback()
# TODO: rollback, but need to wait BDS-91
raise
Session._instance = self
self._ts_dbs = {}
[docs] def use(self, *items):
"""
Makes DB or TS available inside the session.
Parameters
----------
items : :py:class:`~onetick.py.DB` or :py:class:`~onetick.py.servers.RemoteTS` objects
Items to be added to session.
Examples
--------
(note that ``session`` is created before this example)
>>> list(otp.databases()) # doctest: +ELLIPSIS
['COMMON', 'DEMO_L1', ..., 'SOME_DB']
>>> new_db = otp.DB('ZZZZ')
>>> session.use(new_db)
>>> list(otp.databases()) # doctest: +ELLIPSIS
['COMMON', 'DEMO_L1', ..., 'SOME_DB', 'ZZZZ']
"""
for item in items:
self.locator.add(item)
if isinstance(item, _db.db._DB):
try:
self.acl.add(item)
except: # noqa
self.locator.remove(item)
raise
[docs] def use_stub(self, stub_name):
"""
Adds stub-DB into the session.
The shortcut for ``.use(otp.DB(stub_name))``
Parameters
----------
stub_name : str
name of the stub
"""
return self.use(_db.DB(stub_name))
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
@staticmethod
def _available_dbs():
return _databases()
def _get_ts_dbs(self):
locator_dbs = self.locator.databases
all_dbs = self._available_dbs()
for db_name in all_dbs:
if db_name not in locator_dbs:
if db_name not in self._ts_dbs:
self._ts_dbs[db_name] = _db.db._DB(db_name)
[docs] def close(self):
"""
Close session
"""
if Session._instance == self:
try:
if self._config:
del self._config
self._config = None
finally:
if self._lib:
self._lib.cleanup()
self._lib = None
self._env_rollback()
# del os.environ['ONE_TICK_SESSION_CFG_PATH']
# del os.environ['ONE_TICK_SESSION_ACL_PATH']
# del os.environ['ONE_TICK_SESSION_LOCATOR_PATH']
Session._instance = None
def __del__(self):
self.close()
@property
def config(self):
"""
A reference to the underlying Config object that represents OneTick config file.
Returns
-------
:py:class:`onetick.py.session.Config`
"""
return self._config
@config.setter
def config(self, cfg):
self.close()
self._construct(cfg, override_env=self._override_env)
@property
def acl(self):
"""
A reference to the underlying ACL object that represents OneTick access control list file.
Returns
-------
:py:class:`onetick.py.session.ACL`
"""
return self._config.acl
@property
def locator(self):
"""
A reference to the underlying Locator that represents OneTick locator file.
Returns
-------
:py:class:`onetick.py.session.Locator`
"""
return self._config.locator
@property
def license(self):
return self._config.license
@property
def ts_databases(self):
self._get_ts_dbs()
return self._ts_dbs
@property
def databases(self):
return self._available_dbs()
[docs]class TestSession(Session):
def __init__(self, *args, **kwargs):
"""
This class does the same as :py:class:`onetick.py.session.Session`
but also define default values.
"""
config['tz'] = 'EST5EDT'
config['default_db'] = 'DEMO_L1'
config['default_symbol'] = 'AAPL'
config['default_start_time'] = datetime(2003, 12, 1, 0, 0, 0)
config['default_end_time'] = datetime(2003, 12, 4, 0, 0, 0)
super().__init__(*args, **kwargs)