Source code for onetick.py.session

import os

import warnings
import shutil

from locator_parser.io import FileReader, FileWriter, PrintWriter
from locator_parser.actions import Add, GetAll, Delete
from locator_parser.common import apply_actions
from locator_parser import locator as _locator
from locator_parser import acl as _acl
from abc import ABC, abstractmethod
from onetick.lib.instance import OneTickLib
from . import utils
from . import license as _license
from . import db as _db
from . import servers as _servers
from .db._inspection import databases as _databases


class EntityOperationFailed(Exception):
    """
    Raise when operation with an entity for a config
    in this module failed
    """


class MultipleSessionsException(Exception):
    """
    Raises when user tries to initiate a new one Session
    when another one is already used
    """


def _apply_to_entity(cfg, entity, func, roll_back_func):
    """
    Function generalizes operations for locators and ACLs:
    tries to apply query function, and calls roll_back_func if
    the query fails
    """
    res = func(entity)

    try:
        cfg.reload()
    except:  # noqa
        roll_back_func(entity)
        raise

    return res


class _FileHandler_(ABC):
    def __init__(self, file_h=None, clean_up=True, copied=True, session_ref=None):
        self._file = file_h
        self._clean_up = clean_up
        # flag to understand whether we work with externally passed files;
        # it is set and affects logic, when copy=False
        self._copied = copied
        self._session_ref = session_ref

    @property
    def path(self):
        return self._file.path

    @property
    def file(self):
        return self._file

    def copy(self, clean_up=True, copy=True, session_ref=None):
        return self.__class__(self.path, clean_up=clean_up, copy=copy, session_ref=session_ref)  # pylint: disable=E1123

    @abstractmethod
    def cleanup(self):
        pass


class _CommonBuilder_(ABC):
    def __init__(self, src=None, clean_up=True, copy=True, session_ref=None):
        self.src = src
        self.clean_up = clean_up
        self.copy = copy
        self.session_ref = session_ref

    @abstractmethod
    def build(self):
        pass


[docs]class ACL(_FileHandler_): class User(str): """ Subclass represents an ACL user """ pass def __init__(self, path=None, clean_up=True, copy=True, session_ref=None): """ Class representing OneTick database access list file. ACL is the file that describes the list of the users that are allowed to access the database and what permissions do they have. Parameters ---------- path: str A path to custom acl file. Default is `None`, that means to generate a temporary acl file. clean_up: bool If `True`, then temporary acl file will be removed when ACL object will be destroyed. It is helpful for debug purpose. Default is `True`. copy: bool If `True`, then the passed custom acl file by the ``path`` parameter will be copied first before usage. It might be used when you want to work with a custom acl file, but don't want to change the original file; in that case a custom acl file will be copied into a temporary file and every request for modification will be executed for that temporary file. Default is `True`. """ copied = True # TODO: implement this logic later # if copy is None and path is not None: # # if copy rule is not specified, but path is specified # # then we set copy to True with safety goal, otherwise # # we would might change a permanent file without user # # acknowledgment # copy = True # raise Warning("You set the ACL file, but have not specify a copy rule. " # "We copy it with safety goal, and it means you will work " # "with copied file instead of passed. If you want to work " # "with passed file directly, then you could set the 'copy' " # "parameter to True.") # if path is set, then copy file, we should not work directly # with externally passed files if copy: if path: file_h = utils.TmpFile(suffix=".acl", clean_up=clean_up) shutil.copyfile(path, file_h.path) else: file_h = utils.tmp_acl(clean_up=clean_up) else: if path: file_h = utils.PermanentFile(path) copied = False else: file_h = utils.tmp_acl(clean_up=clean_up) assert file_h is not None super().__init__(file_h, clean_up=clean_up, copied=copied, session_ref=session_ref) self._added_dbs = [] def cleanup(self): for db in self._added_dbs: self._remove_db(db) self._added_dbs = [] self.reload() def _add_db(self, db): permissions = {} if db._write: permissions["write_access"] = "true" if hasattr(db, "_destroy_access") and db._destroy_access: permissions["destroy_access"] = "true" add_db = Add(_acl.DB(id=db.id, read_access="true")) res = apply_actions(_acl.parse_acl, FileReader(self.path), FileWriter(self.path), [add_db], flush=True) if not res: return res if permissions: add_allow = Add(_acl.Allow(role="Admin", **permissions)) add_allow.add_where(_acl.DB, id=db.id) return apply_actions(_acl.parse_acl, FileReader(self.path), FileWriter(self.path), [add_allow], flush=True) return True def _remove_db(self, db): remove_db = Delete() remove_db.add_where(_acl.DB, id=db.id) return apply_actions(_acl.parse_acl, FileReader(self.path), FileWriter(self.path), [remove_db], flush=True) def _add_user(self, user): add_user = Add(_acl.User(name=user)) add_user.add_where(_acl.Role, name="Admin") return apply_actions(_acl.parse_acl, FileReader(self.path), FileWriter(self.path), [add_user], flush=True) def _remove_user(self, user): remove_user = Delete() remove_user.add_where(_acl.Role, name="Admin") remove_user.add_where(_acl.User, name=user) return apply_actions(_acl.parse_acl, FileReader(self.path), FileWriter(self.path), [remove_user], flush=True) def add(self, entity): """ Add entity to the ACL and reload it. If it failes, then tries to roll basck to the starting state Parameters --------- entity : DB or ACL.User Raises ------ TypeError, EntityOperationFailed """ if isinstance(entity, _db.DB): if entity.id not in self.databases: if not _apply_to_entity(self, entity, self._add_db, self._remove_db): raise EntityOperationFailed(f'Adding "{entity}" db to the acl "{self.path}" has failed :(') self._added_dbs.append(entity) else: pass # TODO: need to think what to do in case when we add database with the same id elif isinstance(entity, ACL.User): if not _apply_to_entity(self, entity, self._add_user, self._remove_user): raise EntityOperationFailed(f'Adding "{entity}" user to the acl "{self.path}" has failed :(') else: raise TypeError(f'Entity of type "{type(entity)}" is not supported') def remove(self, entity): """ Remove entity from the ACL and reload it. If it failes, then tries to roll back to the starting state. Parameters ---------- entity : DB or ACL.User Raises ------ ValueError, TypeError, EntityOperationFailed """ if isinstance(entity, _db.DB): if entity not in self._added_dbs: raise ValueError(f'DB "{entity}" was not added') if not _apply_to_entity(self, entity, self._remove_db, self._add_db): raise EntityOperationFailed(f'Remove "{entity}" db from the acl "{self.path}" has failed :(') self._added_dbs.remove(entity) elif isinstance(entity, ACL.User): if not _apply_to_entity(self, entity, self._remove_user, self._add_user): raise EntityOperationFailed(f'Remove "{entity}" user from the acl "{self.path}" has failed :(') else: raise TypeError(f'Entity of type "{type(entity)}" is not supported') def reload(self, db=None): if self._session_ref is not None: return utils.reload_config(db, config_type='ACCESS_LIST') def _read_dbs(self): get_db = GetAll() get_db.add_where(_acl.DB) apply_actions(_acl.parse_acl, FileReader(self.path), PrintWriter(), [get_db]) return list(map(lambda x: x.id, get_db.result)) def _dbs(self): action = GetAll() action.add_where(_acl.DB) apply_actions(_acl.parse_acl, FileReader(self.path), PrintWriter(), [action]) return list(map(lambda x: x.id, action.result)) def _users(self): action = GetAll() action.add_where(_acl.Role, name="Admin") action.add_where(_acl.User) apply_actions(_acl.parse_acl, FileReader(self.path), PrintWriter(), [action]) return list(map(lambda x: x.name, action.result)) @property def databases(self): return self._dbs() @property def users(self): return self._users()
class ACLBuilder(_CommonBuilder_): def build(self): params = {"clean_up": self.clean_up, "copy": self.copy, "session_ref": self.session_ref} if isinstance(self.src, str): return ACL(self.src, **params) elif isinstance(self.src, utils.File): return ACL(self.src.path, **params) elif isinstance(self.src, ACL): return self.src.copy(**params) elif self.src is None: return ACL(**params) raise ValueError(f'It is not allowed to build ACL from the object of type "{type(self.src)}"')
[docs]class Locator(_FileHandler_): def __init__(self, path=None, clean_up=True, copy=True, empty=False, session_ref=None): """ Class representing OneTick database locator. Locator is the file that describes database name, location and other options. Parameters ---------- path: str A path to custom locator file. Default is `None`, that means to generate a temporary locator. clean_up: bool If True, then temporary locator will be removed when Locator object will be destroyed. It is helpful for debug purpose. Default is `True`. copy: bool If `True`, then the passed custom locator by the ``path`` parameter will be copied firstly before usage. It might be used when you want to work with a custom locator, but don't want to change the original file; in that case a custom locator will be copied into a temporary locator and every request for modification will be executed for that temporary locator. Default is `True`. empty: bool If `True`, then a temporary locator will have no databases, otherwise it will have default otp.config.default_db, BESTEX, ALERTS, COMMON and WORKFLOW databases. Default is `False`. """ copied = True # if path is set, then copy file, we should not work directly # with externally passed files if copy: if path: file_h = utils.TmpFile(".locator", clean_up=clean_up) shutil.copyfile(path, file_h) else: file_h = utils.tmp_locator(clean_up=clean_up, empty=empty) else: if path: file_h = utils.PermanentFile(path) copied = False else: file_h = utils.tmp_locator(clean_up=clean_up, empty=empty) assert file_h is not None super().__init__(file_h, clean_up=clean_up, copied=copied, session_ref=session_ref) self._added_dbs = [] self._added_ts = [] def cleanup(self): for db in self._added_dbs: self._remove_db(db) for server in self._added_ts: self._remove_ts(str(server)) self._added_dbs = [] self._added_ts = [] self.reload() @property def databases(self): return self._dbs() @property def tick_servers(self): return self._ts() def reload(self, db_=None): if self._session_ref is not None: return utils.reload_config(db_, config_type='LOCATOR') def _dbs(self): action = GetAll() action.add_where(_locator.DB) apply_actions(_locator.parse_locator, FileReader(self.path), PrintWriter(), [action]) return list(map(lambda x: x.id, action.result)) def _ts(self): get_ts = GetAll() get_ts.add_where(_locator.TickServers) get_ts.add_where(_locator.ServerLocation) apply_actions(_locator.parse_locator, FileReader(self.path), PrintWriter(), [get_ts]) return [location.location for location in get_ts.result] def _add_db(self, db): res = apply_actions( _locator.parse_locator, FileReader(self.path), FileWriter(self.path), [Add(_locator.DB(id=db.id, **db.properties))], flush=True, ) if not res: return res result = True for location in db.locations: add_location = Add(_locator.Location(**location)) add_location.add_where(_locator.DB, id=db.id) result &= apply_actions( _locator.parse_locator, FileReader(self.path), FileWriter(self.path), [add_location], flush=True ) return result def _remove_db(self, db): remove_db = Delete() remove_db.add_where(_locator.DB, id=db.id) return apply_actions( _locator.parse_locator, FileReader(self.path), FileWriter(self.path), [remove_db], flush=True ) def _add_ts(self, server): """ Add server to locator file (without reloading) Parameters ---------- server : RemoteTS Server to be added to locator. """ actions = [] if server.cep: actions.append(Add(_locator.CEPServerLocation(location=str(server)))) else: actions.append(Add(_locator.ServerLocation(location=str(server)))) return apply_actions( _locator.parse_locator, FileReader(self.path), FileWriter(self.path), actions, flush=True, ) def _remove_ts(self, server): """ Remove server from locator file (without reloading) Parameters ---------- server : RemoteTS Server to remove from locator """ remove_db = Delete() remove_db.add_where(_locator.ServerLocation, location=str(server)) return apply_actions( _locator.parse_locator, FileReader(self.path), FileWriter(self.path), [remove_db], flush=True ) def _add_locator(self, locator): """ Add reference to a locator Parameters --------- locator : Locator """ return apply_actions( _locator.parse_locator, FileReader(self.path), FileWriter(self.path), [Add(_locator.Include(path=locator.path))], flush=True, ) def _remove_locator(self, locator): """ Remove reference for a locator Parameters ---------- locator : Locator """ remove_locator = Delete() remove_locator.add_where(_locator.Include, path=locator.path) return apply_actions( _locator.parse_locator, FileReader(self.path), FileWriter(self.path), [remove_locator], flush=True ) def add(self, entity): """ Add entity to the locator and reload it. If it failes, then tries to roll back to the starting state Parameters ---------- entity : DB, RemoteTS or Locator Raises ------ TypeError, EntityOperationFailed """ if isinstance(entity, _db.db._DB): if entity.id not in self.databases: if not _apply_to_entity(self, entity, self._add_db, self._remove_db): raise EntityOperationFailed(f'Add "{entity}" db to the locator "{self.path}" has failed :(') self._added_dbs.append(entity) else: warnings.warn(f"Database {entity.id} is already used and will not be rewritten with this command") elif isinstance(entity, _servers.RemoteTS): if not _apply_to_entity(self, entity, self._add_ts, self._remove_ts): raise EntityOperationFailed( f'Add "{entity}" remote ts into the locator "{self.path}" has failed :(' ) self._added_ts.append(entity) elif isinstance(entity, Locator): if not _apply_to_entity(self, entity, self._add_locator, self._remove_locator): raise EntityOperationFailed( f'Add "{entity}" locator reference to the locator "{self.path}" has failed :(' ) else: raise TypeError(f'Entity of type "{type(entity)}" is not supported') def remove(self, entity): """ Remove entity from the locator and reload it. If it failes, then tries to roll back to the starting state. Raises ------ ValueError, TypeError, EntityOperationFailed """ if isinstance(entity, _db.db._DB): if entity not in self._added_dbs: raise ValueError(f'DB "{entity}" was not added') if not _apply_to_entity(self, entity, self._remove_db, self._add_db): raise EntityOperationFailed(f'Remove "{entity}" db from the locator "{self.path}" has failed :(') self._added_dbs.remove(entity) elif isinstance(entity, _servers.RemoteTS): if entity not in self._added_ts: raise ValueError(f'Tick server "{entity}" was not added') if not _apply_to_entity(self, entity, self._remove_ts, self._add_ts): raise EntityOperationFailed( f'Remove "{entity}" ts record from the locator "{self.path}" has failed :(' ) self._added_ts.remove(entity) elif isinstance(entity, Locator): if not _apply_to_entity(self, entity, self._remove_locator, self._add_locator): raise EntityOperationFailed( f'Remove "{entity.path}" locator reference from the locator "{self.path}" has failed :(' ) else: raise TypeError(f'Entity of type "{type(entity)}" is not supported') def __contains__(self, item): if str(item) in self.databases: return True return False
class LocatorBuilder(_CommonBuilder_): def build(self): params = {"clean_up": self.clean_up, "copy": self.copy, "session_ref": self.session_ref} if isinstance(self.src, str): return Locator(self.src, **params) elif isinstance(self.src, utils.File): return Locator(self.src.path, **params) elif isinstance(self.src, Locator): return self.src.copy(**params) elif isinstance(self.src, _servers.RemoteTS): locator = Locator(empty=True, **params) locator.add(self.src) return locator elif self.src is None: return Locator(**params) raise ValueError(f'It is not allowed to build Locator from the object of type "{type(self.src)}"')
[docs]class Config(_FileHandler_): def __init__( self, config=None, locator=None, acl=None, otq_path=None, csv_path=None, clean_up=True, copy=True, session_ref=None, license=None, ): """ Parameters ---------- config: path or Config Allows to specify a custom config. None is to use temporary generated config. Default is None. locator: Locator Allows to specify a custom locator file. None is to use temporary generated locator. Default is None. acl: ACL Allows to specify a custom acl file. None is to use temporary generated acl. Default is None. otq_path: list of paths to lookup queries OTQ_PATH parameter in the OneTick config file. Default is None, that is equal to the empty list. csv_path: list of paths to lookup csv files CSV_PATH parameter in the OneTick config file. Default is None, that is equal to the empty list. clean_up: bool If True, then temporary config file will be removed when the Config instance will be destroyed. It is helpful for debug purpose. Default is True. copy: bool If True, then the passed custom config file will be copied firstly before any usage with it. It might be used when you want to work with a custom config file, but don't want to change to change the original file; in that case a custom config will be copied into a temporary config file and every request for modification will be executed for that temporary config. Default is True. license: instance from the onetick.py.license module License to use. If it is not set, then onetick.py.license.Default is used. """ if config and (locator or acl): raise ValueError("It is not allowed to use 'config' parameter along with 'locator' or 'acl'") # builders that construct locator and acl based on parameters acl_builder = ACLBuilder(src=acl, clean_up=clean_up, copy=copy, session_ref=session_ref) locator_builder = LocatorBuilder(src=locator, clean_up=clean_up, copy=copy, session_ref=session_ref) config_copied = True if config: # copy passed file, we should not work with externally passed files if copy: self._file = utils.TmpFile(".cfg", clean_up=clean_up) config_path = config.path if isinstance(config, Config) else config shutil.copyfile(config_path, self._file.path) else: self._file = utils.PermanentFile(config) config_copied = False if utils.is_param_in_config(self._file.path, "ACCESS_CONTROL_FILE"): acl_builder.src = utils.get_config_param(self._file.path, "ACCESS_CONTROL_FILE") if utils.is_param_in_config(self._file.path, "DB_LOCATOR.DEFAULT"): locator_builder.src = utils.get_config_param(self._file.path, "DB_LOCATOR.DEFAULT") else: self._file = utils.tmp_config(clean_up=clean_up) self._acl = acl_builder.build() os.environ["ONE_TICK_SESSION_ACL_PATH"] = self._acl.path self._locator = locator_builder.build() os.environ["ONE_TICK_SESSION_LOCATOR_PATH"] = self._locator.path super().__init__(self._file, clean_up=clean_up, copied=config_copied, session_ref=session_ref) # Here we can start to modify files - they are either copied or generated # ------------------------------------------------------------------------ utils.modify_config_param(self.path, "ACCESS_CONTROL_FILE", self._acl.path, throw_on_missing=False) utils.modify_config_param(self.path, "DB_LOCATOR.DEFAULT", self._locator.path, throw_on_missing=False) if otq_path: otq_path = map(str, otq_path) utils.modify_config_param(self.path, "OTQ_FILE_PATH", ",".join(otq_path), throw_on_missing=False) if csv_path: csv_path = map(str, csv_path) utils.modify_config_param(self.path, "CSV_FILE_PATH", ",".join(csv_path), throw_on_missing=False) # set license # --------------------------- custom_license = utils.is_param_in_config(self.path, "LICENSE_REPOSITORY_DIR") custom_license &= utils.is_param_in_config(self.path, "ONE_TICK_LICENSE_FILE") if license: self._license = license else: if custom_license: lic_file = utils.get_config_param(self.path, "ONE_TICK_LICENSE_FILE") lic_dir = utils.get_config_param(self.path, "LICENSE_REPOSITORY_DIR") self._license = _license.Custom(lic_file, lic_dir) else: if isinstance(locator, _servers.RemoteTS): self._license = _license.Remote() else: self._license = _license.Default() if not custom_license: # no need to set already defined custom values if self._license.dir: utils.modify_config_param(self.path, "LICENSE_REPOSITORY_DIR", self._license.dir, throw_on_missing=False) if self._license.file: utils.modify_config_param(self.path, "ONE_TICK_LICENSE_FILE", self._license.file, throw_on_missing=False) @property def acl(self): return self._acl @property def locator(self): return self._locator @property def license(self): return self._license def copy(self, clean_up=True, copy=True, session_ref=None): """ overridden version of copy """ return self.__class__(self.path, clean_up=clean_up, copy=copy, session_ref=session_ref, license=self._license) @staticmethod def build(obj=None, clean_up=True, copy=True, session_ref=None): params = {"clean_up": clean_up, "copy": copy, "session_ref": session_ref} if isinstance(obj, str): return Config(obj, **params) elif isinstance(obj, utils.File): return Config(obj.path, **params) elif isinstance(obj, Config): return obj.copy(**params) elif obj is None: return Config(**params) raise ValueError(f'It is not allowed to build Config from the object of type "{type(obj)}"') def cleanup(self): # no logic to clean up content self._acl.cleanup() self._locator.cleanup() @property def otq_path(self): try: return utils.get_config_param(self.path, "OTQ_FILE_PATH") except AttributeError: return None
[docs]class Session(object): """ A class for setting up working OneTick session. It keeps configuration files during the session and allows to manage them. When instance is out of scope, then instance cleans up config files and configuration. There are several ways to leave the scope: .close(), __del__() and __exit__() methods. It is allowed to have only one alive instance in the process. If you don't use Session's instance, then ``ONE_TICK_CONFIG`` environment variable should be set to be able to work with OneTick. If config file is not set then temporary is generated. Config includes locator and acl file, and if they are not set, then they are generated Parameters ---------- config : str, Config, optional Path to an existing OneTick config file; if it is not set, then config will generated. If config is not set, then temporary config is generated. Default is None. clean_up : bool, optional A flag to control cleaning up process: if it is True then all temporary generated files will be automatically removed. It is helpful for debugging. The flag affects only generated files, but does not externally passed. Default is True. copy : bool, optional A flag to control file copy process: if it is True then all externally passed files will be copied before usage, otherwise all modifications during an existing session happen directly with passed config files. NOTE: we suggest to set this flag only when you fully understand it's effect. Default is True. override_env : bool, optional If flag is True, then unconditionally os.environ['ONE_TICK_CONFIG'] will be overriden with a config that belongs to a Session. Otherwise os.envrion['ONE_TICK_CONFIG'] will be defined in the scope of session only when externally 'ONE_TICK_CONFIG' is not defined. For example, it is helpful when you test ascii_loader that uses 'ONE_TICK_CONFIG' only. Default is False ( default is False, because overriding external evnironment variable might be not obvious and desirable ) redirect_logs: bool, optional If flag is True, then OneTick logs will be redirected into a temporary log file. Otherwise logs will be mixed with output. Default is True. Attributes ---------- config: Config a reference to the underlying Config object that represents OneTick config file acl: ACL a reference to the underlying ACL object that represents OneTick access control list file locator: Locator a reference to the underlying Locator that represents OneTick locator file """ _instance = None def __init__(self, config=None, clean_up=True, copy=True, override_env=False, redirect_logs=True): self._construct(config, clean_up, copy, override_env, redirect_logs) def _construct(self, config=None, clean_up=True, copy=True, override_env=False, redirect_logs=True): if Session._instance: raise MultipleSessionsException( "It is forbidden to use multiple active sessions simultaniously in one process" ) def onetick_cfg_rollback(var): """ function to rollback ONE_TICK_CONFIG state """ def _impl(): if var is None: if "ONE_TICK_CONFIG" in os.environ: del os.environ["ONE_TICK_CONFIG"] else: os.environ["ONE_TICK_CONFIG"] = var return _impl self._lib = None self._env_rollback = onetick_cfg_rollback(os.environ.get("ONE_TICK_CONFIG", None)) self._override_env = override_env self._config = Config.build(config, clean_up=clean_up, copy=copy, session_ref=self) os.environ["ONE_TICK_SESSION_CFG_PATH"] = self._config.path try: if "ONE_TICK_CONFIG" not in os.environ: os.environ["ONE_TICK_CONFIG"] = self._config.path else: if override_env: os.environ["ONE_TICK_CONFIG"] = self._config.path else: warnings.warn( UserWarning( "ONE_TICK_CONFIG env variable has been set before a session, " "and in the session scope it is not related to the session config. " "If you want to make ONE_TICKC_CONFIG env variable be consistent " "with the session, then look at the override_env flag " "for the Session constructor" ) ) OneTickLib().cleanup() self._log_file = utils.TmpFile(suffix=".onetick.log", clean_up=clean_up) self._lib = OneTickLib(self._config.path, log_file=self._log_file.path) except: # noqa self._env_rollback() # TODO: rollback, but need to wait BDS-91 raise Session._instance = self self._ts_dbs = {}
[docs] def use(self, *items): """ Makes DB or TS available inside the session. Parameters ---------- items : :py:class:`~onetick.py.DB` or :py:class:`~onetick.py.servers.RemoteTS` objects Items to be added to session. """ for item in items: self.locator.add(item) if isinstance(item, _db.db._DB): try: self.acl.add(item) except: # noqa self.locator.remove(item) raise
[docs] def use_stub(self, stub_name): """ Adds stub-DB into the session. The shortcut for ``.use(otp.DB(stub_name))`` Parameters ---------- stub_name : str name of the stub """ return self.use(_db.DB(stub_name))
def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() @staticmethod def _available_dbs(): return _databases() def _get_ts_dbs(self): locator_dbs = self.locator.databases all_dbs = self._available_dbs() for db_name in all_dbs: if db_name not in locator_dbs: if db_name not in self._ts_dbs: self._ts_dbs[db_name] = _db.db._DB(db_name)
[docs] def close(self): """ Close session """ if Session._instance == self: try: if self._config: del self._config self._config = None finally: if self._lib: self._lib.cleanup() self._lib = None self._env_rollback() # del os.environ['ONE_TICK_SESSION_CFG_PATH'] # del os.environ['ONE_TICK_SESSION_ACL_PATH'] # del os.environ['ONE_TICK_SESSION_LOCATOR_PATH'] Session._instance = None
def __del__(self): self.close() @property def config(self): return self._config @config.setter def config(self, cfg): self.close() self._construct(cfg, override_env=self._override_env) @property def acl(self): return self._config.acl @property def locator(self): return self._config.locator @property def license(self): return self._config.license @property def ts_databases(self): self._get_ts_dbs() return self._ts_dbs @property def databases(self): return self._available_dbs()