import os
import warnings
import shutil
from datetime import datetime
from typing import List
from locator_parser.io import FileReader, FileWriter, PrintWriter
from locator_parser.actions import Add, GetAll, Delete
from locator_parser.common import apply_actions
from locator_parser import locator as _locator
from locator_parser import acl as _acl
from abc import ABC, abstractmethod
from onetick.lib.instance import OneTickLib
from . import utils
from . import license as _license
from . import db as _db
from . import servers as _servers
from .configuration import config
from .db._inspection import databases as _databases
class EntityOperationFailed(Exception):
"""
Raise when operation with an entity for a config
in this module failed
"""
class MultipleSessionsException(Exception):
"""
Raises when user tries to initiate a new one Session
when another one is already used
"""
def _apply_to_entities(cfg, operations):
"""
Function generalizes operations for locators and ACLs:
tries to apply ``operations``, and does rollback in case
OneTick config is invalid.
"""
if not operations:
return
for entities, func, roll_back_func in operations:
if not entities:
raise ValueError("At least one argument in parameter 'entities' is expected.")
result = func(entities)
if not result:
entity_name = entities[0].__class__.__name__
entities = list(map(str, entities))
raise EntityOperationFailed(
f'Operation {func.__name__} for {entity_name}s {entities}'
f' for {cfg.__class__.__name__} "{cfg.path}" has failed'
)
try:
cfg.reload()
except Exception:
for entities, func, roll_back_func in reversed(operations):
roll_back_func(entities)
raise
class _FileHandler_(ABC):
def __init__(self, file_h=None, clean_up=True, copied=True, session_ref=None):
self._file = file_h
self._clean_up = clean_up
# flag to understand whether we work with externally passed files;
# it is set and affects logic, when copy=False
self._copied = copied
self._session_ref = session_ref
@property
def path(self):
return self._file.path
@property
def file(self):
return self._file
def copy(self, clean_up=True, copy=True, session_ref=None):
return self.__class__(self.path, clean_up=clean_up, copy=copy, session_ref=session_ref) # pylint: disable=E1123
@abstractmethod
def cleanup(self):
pass
@staticmethod
def _db_in_dbs_case_insensitive(db_id: str, databases: List[str]):
for db_name in databases:
if db_id.upper() == db_name.upper():
return True
return False
class _CommonBuilder_(ABC):
def __init__(self, src=None, clean_up=True, copy=True, session_ref=None):
self.src = src
self.clean_up = clean_up
self.copy = copy
self.session_ref = session_ref
@abstractmethod
def build(self):
pass
[docs]class ACL(_FileHandler_):
class User(str):
"""
Subclass represents an ACL user
"""
pass
def __init__(self, path=None, clean_up=True, copy=True, session_ref=None):
"""
Class representing OneTick database access list file.
ACL is the file that describes the list of the users
that are allowed to access the database and what permissions do they have.
Parameters
----------
path: str
A path to custom acl file. Default is `None`, that means to generate a temporary acl file.
clean_up: bool
If `True`, then temporary acl file will be removed when ACL object will be destroyed. It is
helpful for debug purpose. Default is `True`.
copy: bool
If `True`, then the passed custom acl file by the ``path`` parameter will be copied first before
usage. It might be used when you want to work with a custom acl file, but don't want to change
the original file; in that case a custom acl file will be copied into a temporary file and
every request for modification will be executed for that temporary file. Default is `True`.
"""
copied = True
# TODO: implement this logic later
# if copy is None and path is not None:
# # if copy rule is not specified, but path is specified
# # then we set copy to True with safety goal, otherwise
# # we would might change a permanent file without user
# # acknowledgment
# copy = True
# raise Warning("You set the ACL file, but have not specify a copy rule. "
# "We copy it with safety goal, and it means you will work "
# "with copied file instead of passed. If you want to work "
# "with passed file directly, then you could set the 'copy' "
# "parameter to True.")
# if path is set, then copy file, we should not work directly
# with externally passed files
if copy:
if path:
file_h = utils.TmpFile(suffix=".acl", clean_up=clean_up)
shutil.copyfile(path, file_h.path)
else:
file_h = utils.tmp_acl(clean_up=clean_up)
else:
if path:
file_h = utils.PermanentFile(path)
copied = False
else:
file_h = utils.tmp_acl(clean_up=clean_up)
assert file_h is not None
super().__init__(file_h, clean_up=clean_up, copied=copied, session_ref=session_ref)
self._added_dbs = []
def cleanup(self):
self._remove_db(self._added_dbs)
self._added_dbs = []
self.reload()
def _apply_actions(self, actions, print_writer=False):
writer = PrintWriter() if print_writer else FileWriter(self.path)
flush = False if print_writer else True
return apply_actions(_acl.parse_acl, FileReader(self.path), writer, actions, flush=flush)
def _add_db(self, dbs):
actions = []
for db in dbs:
actions.append(Add(_acl.DB(id=db.id, read_access="true")))
permissions = {}
if db._write:
permissions["write_access"] = "true"
if hasattr(db, "_destroy_access") and db._destroy_access:
permissions["destroy_access"] = "true"
if permissions:
action = Add(_acl.Allow(role="Admin", **permissions))
action.add_where(_acl.DB, id=db.id)
actions.append(action)
return self._apply_actions(actions)
def _remove_db(self, dbs):
actions = []
for db in dbs:
action = Delete()
action.add_where(_acl.DB, id=db.id)
actions.append(action)
return self._apply_actions(actions)
def _add_user(self, users):
actions = []
for user in users:
action = Add(_acl.User(name=user))
action.add_where(_acl.Role, name="Admin")
actions.append(action)
return self._apply_actions(actions)
def _remove_user(self, users):
actions = []
for user in users:
action = Delete()
action.add_where(_acl.Role, name="Admin")
action.add_where(_acl.User, name=user)
actions.append(action)
return self._apply_actions(actions)
def add(self, *entities):
"""
Add entities to the ACL and reload it.
If it fails, then tries to roll back to the original state.
Parameters
----------
entities: DB or ACL.User
Raises
------
TypeError, EntityOperationFailed
"""
if len(entities) == 0:
raise ValueError("At least one argument in parameter 'entities' is expected.")
dbs = []
users = []
for entity in entities:
if isinstance(entity, _db.DB):
if self._db_in_dbs_case_insensitive(entity.id, self.databases):
if '//' not in entity.name:
warnings.warn(f"Database '{entity.id}' is already added to the ACL"
" and will not be rewritten with this command."
f" Notice that databases' names are case insensitive.")
continue
dbs.append(entity)
elif isinstance(entity, ACL.User):
users.append(entity)
else:
raise TypeError(f'Entity of type "{type(entity)}" is not supported')
operations = []
if dbs:
operations.append((dbs, self._add_db, self._remove_db))
if users:
operations.append((users, self._add_user, self._remove_user))
_apply_to_entities(self, operations)
self._added_dbs.extend(dbs)
def remove(self, *entities):
"""
Remove entities from the ACL and reload it.
If it fails, then tries to roll back to the original state.
Parameters
----------
entities: DB or ACL.User
Raises
------
ValueError, TypeError, EntityOperationFailed
"""
if len(entities) == 0:
raise ValueError("At least one argument in parameter 'entities' is expected.")
dbs = []
users = []
for entity in entities:
if isinstance(entity, _db.DB):
if entity not in self._added_dbs:
raise ValueError(f'DB "{entity}" was not added')
dbs.append(entity)
elif isinstance(entity, ACL.User):
users.append(entity)
else:
raise TypeError(f'Entity of type "{type(entity)}" is not supported')
operations = []
if dbs:
operations.append((dbs, self._remove_db, self._add_db))
if users:
operations.append((users, self._remove_user, self._add_user))
_apply_to_entities(self, operations)
for db in dbs:
self._added_dbs.remove(db)
def reload(self, db=None):
if self._session_ref is not None:
return utils.reload_config(db, config_type='ACCESS_LIST')
def _read_dbs(self):
get_db = GetAll()
get_db.add_where(_acl.DB)
self._apply_actions([get_db], print_writer=True)
return list(map(lambda x: x.id, get_db.result))
def _dbs(self):
action = GetAll()
action.add_where(_acl.DB)
self._apply_actions([action], print_writer=True)
return list(map(lambda x: x.id, action.result))
def _users(self):
action = GetAll()
action.add_where(_acl.Role, name="Admin")
action.add_where(_acl.User)
self._apply_actions([action], print_writer=True)
return list(map(lambda x: x.name, action.result))
@property
def databases(self):
return self._dbs()
@property
def users(self):
return self._users()
class ACLBuilder(_CommonBuilder_):
def build(self):
params = {"clean_up": self.clean_up, "copy": self.copy, "session_ref": self.session_ref}
if isinstance(self.src, str):
return ACL(self.src, **params)
elif isinstance(self.src, utils.File):
return ACL(self.src.path, **params)
elif isinstance(self.src, ACL):
return self.src.copy(**params)
elif self.src is None:
return ACL(**params)
raise ValueError(f'It is not allowed to build ACL from the object of type "{type(self.src)}"')
[docs]class Locator(_FileHandler_):
def __init__(self, path=None, clean_up=True, copy=True, empty=False, session_ref=None):
"""
Class representing OneTick database locator.
Locator is the file that describes database name, location and other options.
Parameters
----------
path: str
A path to custom locator file. Default is `None`, that means to generate a temporary locator.
clean_up: bool
If True, then temporary locator will be removed when Locator object will be destroyed. It is
helpful for debug purpose. Default is `True`.
copy: bool
If `True`, then the passed custom locator by the ``path`` parameter will be copied firstly before
usage. It might be used when you want to work with a custom locator, but don't want to change
the original file; in that case a custom locator will be copied into a temporary locator and
every request for modification will be executed for that temporary locator. Default is `True`.
empty: bool
If `True`, then a temporary locator will have no databases, otherwise it will have default
otp.config.default_db, BESTEX, ALERTS, COMMON and WORKFLOW databases. Default is `False`.
"""
copied = True
# if path is set, then copy file, we should not work directly
# with externally passed files
if copy:
if path:
file_h = utils.TmpFile(".locator", clean_up=clean_up)
shutil.copyfile(path, file_h)
else:
file_h = utils.tmp_locator(clean_up=clean_up, empty=empty)
else:
if path:
file_h = utils.PermanentFile(path)
copied = False
else:
file_h = utils.tmp_locator(clean_up=clean_up, empty=empty)
assert file_h is not None
super().__init__(file_h, clean_up=clean_up, copied=copied, session_ref=session_ref)
self._added_dbs = []
self._added_ts = []
def cleanup(self):
self._remove_db(self._added_dbs)
self._remove_ts(str(server) for server in self._added_ts)
self._added_dbs = []
self._added_ts = []
self.reload()
@property
def databases(self):
return self._dbs()
@property
def tick_servers(self):
return self._ts()
def reload(self, db_=None):
if self._session_ref is not None:
return utils.reload_config(db_, config_type='LOCATOR')
def _apply_actions(self, actions, print_writer=False):
writer = PrintWriter() if print_writer else FileWriter(self.path)
flush = False if print_writer else True
return apply_actions(_locator.parse_locator, FileReader(self.path), writer, actions, flush=flush)
def _dbs(self):
action = GetAll()
action.add_where(_locator.DB)
self._apply_actions([action], print_writer=True)
return list(map(lambda x: x.id, action.result))
def _ts(self):
get_ts = GetAll()
get_ts.add_where(_locator.TickServers)
get_ts.add_where(_locator.ServerLocation)
self._apply_actions([get_ts], print_writer=True)
return [location.location for location in get_ts.result]
def _add_db(self, dbs):
actions = []
for db in dbs:
actions.append(Add(_locator.DB(id=db.id, **db.properties)))
for location in db.locations:
action = Add(_locator.Location(**location))
action.add_where(_locator.DB, id=db.id)
actions.append(action)
for raw_db in db.raw_data:
common = {k: v for k, v in raw_db.items() if k not in {'id', 'locations'}}
action = Add(_locator.RawDB(id=raw_db['id'], **common))
action.add_where(_locator.DB, id=db.id)
actions.append(action)
for location in raw_db['locations']:
action = Add(_locator.Location(**location))
action.add_where(_locator.DB, id=db.id)
action.add_where(_locator.RawDB, id=raw_db['id'])
actions.append(action)
if db.feed:
options = {k: v for k, v in db.feed.items() if k != 'type'}
action = Add(_locator.Feed(type=db.feed['type']))
action.add_where(_locator.DB, id=db.id)
actions.append(action)
action = Add(_locator.FeedOptions(**options))
action.add_where(_locator.DB, id=db.id)
action.add_where(_locator.Feed, type=db.feed['type'])
actions.append(action)
return self._apply_actions(actions)
def _remove_db(self, dbs):
actions = []
for db in dbs:
action = Delete()
action.add_where(_locator.DB, id=db.id)
actions.append(action)
return self._apply_actions(actions)
def _add_ts(self, servers):
"""
Add servers to locator file (without reloading)
Parameters
----------
servers: RemoteTS
Servers to be added to locator.
"""
actions = []
for server in servers:
if server.cep:
actions.append(Add(_locator.CEPServerLocation(location=str(server))))
else:
actions.append(Add(_locator.ServerLocation(location=str(server))))
return self._apply_actions(actions)
def _remove_ts(self, servers):
"""
Remove servers from locator file (without reloading)
Parameters
----------
servers: RemoteTS
Servers to remove from locator
"""
actions = []
for server in servers:
action = Delete()
action.add_where(_locator.ServerLocation, location=str(server))
actions.append(action)
return self._apply_actions(actions)
def _add_locator(self, locators):
"""
Add references to locators
Parameters
----------
locators: Locator
"""
actions = []
for locator in locators:
actions.append(Add(_locator.Include(path=locator.path)))
return self._apply_actions(actions)
def _remove_locator(self, locators):
"""
Remove references for locators
Parameters
----------
locators: Locator
"""
actions = []
for locator in locators:
action = Delete()
action.add_where(_locator.Include, path=locator.path)
actions.append(action)
return self._apply_actions(actions)
def add(self, *entities):
"""
Add entities to the locator and reload it.
If it fails, then tries to roll back to the original state.
Parameters
----------
entities: DB, RemoteTS or Locator
Raises
------
TypeError, EntityOperationFailed
"""
if len(entities) == 0:
raise ValueError("At least one argument in parameter 'entities' is expected.")
dbs = []
servers = []
locators = []
for entity in entities:
if isinstance(entity, _db.db._DB):
if self._db_in_dbs_case_insensitive(entity.id, self.databases):
if '//' not in entity.name:
warnings.warn(f"Database '{entity.id}' is already added to the Locator"
" and will not be rewritten with this command."
f" Notice that databases' names are case insensitive.")
continue
dbs.append(entity)
elif isinstance(entity, _servers.RemoteTS):
servers.append(entity)
elif isinstance(entity, Locator):
locators.append(entity)
else:
raise TypeError(f'Entity of type "{type(entity)}" is not supported')
operations = []
if dbs:
operations.append((dbs, self._add_db, self._remove_db))
if servers:
operations.append((servers, self._add_ts, self._remove_ts))
if locators:
operations.append((locators, self._add_locator, self._remove_locator))
_apply_to_entities(self, operations)
self._added_dbs.extend(dbs)
self._added_ts.extend(servers)
def remove(self, *entities):
"""
Remove entities from the locator and reload it.
If it fails, then tries to roll back to the original state.
Raises
------
ValueError, TypeError, EntityOperationFailed
"""
if len(entities) == 0:
raise ValueError("At least one argument in parameter 'entities' is expected.")
dbs = []
servers = []
locators = []
for entity in entities:
if isinstance(entity, _db.db._DB):
if entity not in self._added_dbs:
raise ValueError(f'DB "{entity}" was not added')
dbs.append(entity)
elif isinstance(entity, _servers.RemoteTS):
if entity not in self._added_ts:
raise ValueError(f'Tick server "{entity}" was not added')
servers.append(entity)
elif isinstance(entity, Locator):
locators.append(entity)
else:
raise TypeError(f'Entity of type "{type(entity)}" is not supported')
operations = []
if dbs:
operations.append((dbs, self._remove_db, self._add_db))
if servers:
operations.append((servers, self._remove_ts, self._add_ts))
if locators:
operations.append((locators, self._remove_locator, self._add_locator))
_apply_to_entities(self, operations)
for db in dbs:
self._added_dbs.remove(db)
for server in servers:
self._added_ts.remove(server)
def __contains__(self, item):
if str(item) in self.databases:
return True
return False
class LocatorBuilder(_CommonBuilder_):
def build(self):
params = {"clean_up": self.clean_up, "copy": self.copy, "session_ref": self.session_ref}
if isinstance(self.src, str):
return Locator(self.src, **params)
elif isinstance(self.src, utils.File):
return Locator(self.src.path, **params)
elif isinstance(self.src, Locator):
return self.src.copy(**params)
elif isinstance(self.src, _servers.RemoteTS):
locator = Locator(empty=True, **params)
locator.add(self.src)
return locator
elif self.src is None:
return Locator(**params)
raise ValueError(f'It is not allowed to build Locator from the object of type "{type(self.src)}"')
[docs]class Config(_FileHandler_):
def __init__(
self,
config=None,
locator=None,
acl=None,
otq_path=None,
csv_path=None,
clean_up=True,
copy=True,
session_ref=None,
license=None,
):
"""
Parameters
----------
config: path or Config
Allows to specify a custom config. None is to use temporary generated config. Default is None.
locator: Locator
Allows to specify a custom locator file. None is to use temporary generated locator. Default is None.
acl: ACL
Allows to specify a custom acl file. None is to use temporary generated acl. Default is None.
otq_path: list of paths to lookup queries
OTQ_PATH parameter in the OneTick config file. Default is None, that is equal to the empty list.
csv_path: list of paths to lookup csv files
CSV_PATH parameter in the OneTick config file. Default is None, that is equal to the empty list.
clean_up: bool
If True, then temporary config file will be removed when the Config instance will be destroyed.
It is helpful for debug purpose. Default is True.
copy: bool
If True, then the passed custom config file will be copied firstly before any usage with it.
It might be used when you want to work with a custom config file, but don't want to change to
change the original file; in that case a custom config will be copied into a temporary config
file and every request for modification will be executed for that temporary config. Default
is True.
license: instance from the onetick.py.license module
License to use. If it is not set, then onetick.py.license.Default is used.
"""
if config and (locator or acl):
raise ValueError("It is not allowed to use 'config' parameter along with 'locator' or 'acl'")
# builders that construct locator and acl based on parameters
acl_builder = ACLBuilder(src=acl, clean_up=clean_up, copy=copy, session_ref=session_ref)
locator_builder = LocatorBuilder(src=locator, clean_up=clean_up, copy=copy, session_ref=session_ref)
config_copied = True
if config:
# copy passed file, we should not work with externally passed files
if copy:
self._file = utils.TmpFile(".cfg", clean_up=clean_up)
config_path = config.path if isinstance(config, Config) else config
shutil.copyfile(config_path, self._file.path)
else:
self._file = utils.PermanentFile(config)
config_copied = False
if utils.is_param_in_config(self._file.path, "ACCESS_CONTROL_FILE"):
acl_builder.src = utils.get_config_param(self._file.path, "ACCESS_CONTROL_FILE")
if utils.is_param_in_config(self._file.path, "DB_LOCATOR.DEFAULT"):
locator_builder.src = utils.get_config_param(self._file.path, "DB_LOCATOR.DEFAULT")
else:
self._file = utils.tmp_config(clean_up=clean_up)
self._acl = acl_builder.build()
os.environ["ONE_TICK_SESSION_ACL_PATH"] = self._acl.path
self._locator = locator_builder.build()
os.environ["ONE_TICK_SESSION_LOCATOR_PATH"] = self._locator.path
super().__init__(self._file, clean_up=clean_up, copied=config_copied, session_ref=session_ref)
# Here we can start to modify files - they are either copied or generated
# ------------------------------------------------------------------------
utils.modify_config_param(self.path, "ACCESS_CONTROL_FILE", self._acl.path, throw_on_missing=False)
utils.modify_config_param(self.path, "DB_LOCATOR.DEFAULT", self._locator.path, throw_on_missing=False)
if otq_path:
otq_path = map(str, otq_path)
utils.modify_config_param(self.path, "OTQ_FILE_PATH", ",".join(otq_path), throw_on_missing=False)
if csv_path:
csv_path = map(str, csv_path)
utils.modify_config_param(self.path, "CSV_FILE_PATH", ",".join(csv_path), throw_on_missing=False)
# set license
# ---------------------------
custom_license = utils.is_param_in_config(self.path, "LICENSE_REPOSITORY_DIR")
custom_license &= utils.is_param_in_config(self.path, "ONE_TICK_LICENSE_FILE")
if license:
self._license = license
else:
if custom_license:
lic_file = utils.get_config_param(self.path, "ONE_TICK_LICENSE_FILE")
lic_dir = utils.get_config_param(self.path, "LICENSE_REPOSITORY_DIR")
self._license = _license.Custom(lic_file, lic_dir)
else:
if isinstance(locator, _servers.RemoteTS):
self._license = _license.Remote()
else:
self._license = _license.Default()
if not custom_license: # no need to set already defined custom values
if self._license.dir:
utils.modify_config_param(self.path,
"LICENSE_REPOSITORY_DIR",
self._license.dir,
throw_on_missing=False)
if self._license.file:
utils.modify_config_param(self.path,
"ONE_TICK_LICENSE_FILE",
self._license.file,
throw_on_missing=False)
@property
def acl(self):
return self._acl
@property
def locator(self):
return self._locator
@property
def license(self):
return self._license
def copy(self, clean_up=True, copy=True, session_ref=None):
""" overridden version of copy """
return self.__class__(self.path, clean_up=clean_up, copy=copy, session_ref=session_ref, license=self._license)
@staticmethod
def build(obj=None, clean_up=True, copy=True, session_ref=None):
params = {"clean_up": clean_up, "copy": copy, "session_ref": session_ref}
if isinstance(obj, str):
return Config(obj, **params)
elif isinstance(obj, utils.File):
return Config(obj.path, **params)
elif isinstance(obj, Config):
return obj.copy(**params)
elif obj is None:
return Config(**params)
raise ValueError(f'It is not allowed to build Config from the object of type "{type(obj)}"')
def cleanup(self):
# no logic to clean up content
self._acl.cleanup()
self._locator.cleanup()
@property
def otq_path(self):
try:
return utils.get_config_param(self.path, "OTQ_FILE_PATH")
except AttributeError:
return None
[docs]class Session(object):
"""
A class for setting up working OneTick session. It keeps configuration files during
the session and allows to manage them. When instance is out of scope, then instance
cleans up config files and configuration.
You can leave the scope manually with method :py:meth:`close`.
Also, session is closed automatically if this object is used as a context manager.
It is allowed to have only one alive session instance in the process.
If you don't use Session's instance, then ``ONE_TICK_CONFIG`` environment variable
should be set to be able to work with OneTick.
If config file is not set then temporary is generated.
Config includes locator and acl file, and if they are not set, then they are generated.
Parameters
----------
config : str, Config, optional
Path to an existing OneTick config file; if it is not set, then config will be generated.
If config is not set, then temporary config is generated. Default is None.
clean_up : bool, optional
A flag to control cleaning up process: if it is True then all temporary generated files will
be automatically removed. It is helpful for debugging. The flag affects only generated files, but
does not externally passed. Default is True.
copy : bool, optional
A flag to control file copy process: if it is True then all externally passed files will be
copied before usage, otherwise all modifications during an existing session happen directly with
passed config files. NOTE: we suggest to set this flag only when you fully understand it's effect.
Default is True.
override_env : bool, optional
If flag is True, then unconditionally ``ONE_TICK_CONFIG`` environment variable will be overridden
with a config that belongs to a Session. Otherwise ``ONE_TICK_CONFIG``
will be defined in the scope of session only when it is not defined externally.
For example, it is helpful when you test ascii_loader that uses 'ONE_TICK_CONFIG' only.
Default is False ( default is False, because overriding external environment variable
might be not obvious and desirable )
redirect_logs: bool, optional
If flag is True, then OneTick logs will be redirected into a temporary log file. Otherwise
logs will be mixed with output. Default is True.
Examples
--------
If session is defined with environment, OneTick can be used right away:
>>> 'ONE_TICK_CONFIG' in os.environ
True
>>> list(otp.databases()) # doctest: +ELLIPSIS
['COMMON', 'DEMO_L1', ..., 'SOME_DB']
>>> data = otp.DataSource('SOME_DB', symbol='S1', tick_type='TT')
>>> otp.run(data)
Time X
0 2003-12-01 00:00:00.000 1
1 2003-12-01 00:00:00.001 2
2 2003-12-01 00:00:00.002 3
"""
# TODO: create article for Session in Guides or Concepts
_instance = None
def __init__(self, config=None, clean_up=True, copy=True, override_env=False, redirect_logs=True):
self._construct(config, clean_up, copy, override_env, redirect_logs)
def _construct(self, config=None, clean_up=True, copy=True, override_env=False, redirect_logs=True):
if Session._instance:
raise MultipleSessionsException(
"It is forbidden to use multiple active sessions simultaniously in one process"
)
def onetick_cfg_rollback(var):
"""
function to rollback ONE_TICK_CONFIG state
"""
def _impl():
if var is None:
if "ONE_TICK_CONFIG" in os.environ:
del os.environ["ONE_TICK_CONFIG"]
else:
os.environ["ONE_TICK_CONFIG"] = var
return _impl
self._lib = None
self._env_rollback = onetick_cfg_rollback(os.environ.get("ONE_TICK_CONFIG", None))
self._override_env = override_env
self._config = Config.build(config, clean_up=clean_up, copy=copy, session_ref=self)
os.environ["ONE_TICK_SESSION_CFG_PATH"] = self._config.path
try:
if "ONE_TICK_CONFIG" not in os.environ:
os.environ["ONE_TICK_CONFIG"] = self._config.path
else:
if override_env:
os.environ["ONE_TICK_CONFIG"] = self._config.path
else:
warnings.warn(
UserWarning(
"ONE_TICK_CONFIG env variable has been set before a session, "
"and in the session scope it is not related to the session config. "
"If you want to make ONE_TICKC_CONFIG env variable be consistent "
"with the session, then look at the override_env flag "
"for the Session constructor"
)
)
OneTickLib().cleanup()
self._log_file = utils.TmpFile(suffix=".onetick.log", clean_up=clean_up)
self._lib = OneTickLib(self._config.path, log_file=self._log_file.path)
except: # noqa
self._env_rollback()
# TODO: rollback, but need to wait BDS-91
raise
Session._instance = self
self._ts_dbs = {}
[docs] def use(self, *items):
"""
Makes DB or TS available inside the session.
Parameters
----------
items : :py:class:`~onetick.py.DB` or :py:class:`~onetick.py.servers.RemoteTS` objects
Items to be added to session.
Examples
--------
(note that ``session`` is created before this example)
>>> list(otp.databases()) # doctest: +ELLIPSIS
['COMMON', 'DEMO_L1', ..., 'SOME_DB']
>>> new_db = otp.DB('ZZZZ')
>>> session.use(new_db)
>>> list(otp.databases()) # doctest: +ELLIPSIS
['COMMON', 'DEMO_L1', ..., 'SOME_DB', 'ZZZZ']
"""
self.locator.add(*items)
dbs = []
for item in items:
if isinstance(item, _db.db._DB):
dbs.append(item)
try:
if dbs:
self.acl.add(*dbs)
except Exception:
self.locator.remove(*items)
raise
[docs] def use_stub(self, stub_name):
"""
Adds stub-DB into the session.
The shortcut for ``.use(otp.DB(stub_name))``
Parameters
----------
stub_name : str
name of the stub
"""
return self.use(_db.DB(stub_name))
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
@staticmethod
def _available_dbs():
return _databases()
def _get_ts_dbs(self):
locator_dbs = self.locator.databases
all_dbs = self._available_dbs()
for db_name in all_dbs:
if db_name not in locator_dbs:
if db_name not in self._ts_dbs:
self._ts_dbs[db_name] = _db.db._DB(db_name)
[docs] def close(self):
"""
Close session
"""
if Session._instance == self:
try:
if self._config:
del self._config
self._config = None
finally:
if self._lib:
self._lib.cleanup()
self._lib = None
self._env_rollback()
# del os.environ['ONE_TICK_SESSION_CFG_PATH']
# del os.environ['ONE_TICK_SESSION_ACL_PATH']
# del os.environ['ONE_TICK_SESSION_LOCATOR_PATH']
Session._instance = None
def __del__(self):
self.close()
@property
def config(self):
"""
A reference to the underlying Config object that represents OneTick config file.
Returns
-------
:py:class:`onetick.py.session.Config`
"""
return self._config
@config.setter
def config(self, cfg):
self.close()
self._construct(cfg, override_env=self._override_env)
@property
def acl(self):
"""
A reference to the underlying ACL object that represents OneTick access control list file.
Returns
-------
:py:class:`onetick.py.session.ACL`
"""
return self._config.acl
@property
def locator(self):
"""
A reference to the underlying Locator that represents OneTick locator file.
Returns
-------
:py:class:`onetick.py.session.Locator`
"""
return self._config.locator
@property
def license(self):
return self._config.license
@property
def ts_databases(self):
self._get_ts_dbs()
return self._ts_dbs
@property
def databases(self):
return self._available_dbs()
[docs]class TestSession(Session):
def __init__(self, *args, **kwargs):
"""
This class does the same as :py:class:`onetick.py.session.Session`
but also define default values.
"""
config['tz'] = 'EST5EDT'
config['default_db'] = 'DEMO_L1'
config['default_symbol'] = 'AAPL'
config['default_start_time'] = datetime(2003, 12, 1, 0, 0, 0)
config['default_end_time'] = datetime(2003, 12, 4, 0, 0, 0)
super().__init__(*args, **kwargs)