importosimportwarningsimportshutilfromdatetimeimportdatetimefromtypingimportListfromlocator_parser.ioimportFileReader,FileWriter,PrintWriterfromlocator_parser.actionsimportAdd,GetAll,Deletefromlocator_parser.commonimportapply_actionsfromlocator_parserimportlocatoras_locatorfromlocator_parserimportaclas_aclfromabcimportABC,abstractmethodfromonetick.lib.instanceimportOneTickLibfrom.importutilsfrom.importlicenseas_licensefrom.importdbas_dbfrom.importserversas_serversfrom.configurationimportconfigfrom.db._inspectionimportdatabasesas_databasesclassEntityOperationFailed(Exception):""" Raise when operation with an entity for a config in this module failed """classMultipleSessionsException(Exception):""" Raises when user tries to initiate a new one Session when another one is already used """def_apply_to_entities(cfg,operations):""" Function generalizes operations for locators and ACLs: tries to apply ``operations``, and does rollback in case OneTick config is invalid. """ifnotoperations:returnforentities,func,roll_back_funcinoperations:ifnotentities:raiseValueError("At least one argument in parameter 'entities' is expected.")result=func(entities)ifnotresult:entity_name=entities[0].__class__.__name__entities=list(map(str,entities))raiseEntityOperationFailed(f'Operation {func.__name__} for {entity_name}s {entities}'f' for {cfg.__class__.__name__} "{cfg.path}" has failed')try:cfg.reload()exceptException:forentities,func,roll_back_funcinreversed(operations):roll_back_func(entities)raiseclass_FileHandler_(ABC):def__init__(self,file_h=None,clean_up=True,copied=True,session_ref=None):self._file=file_hself._clean_up=clean_up# flag to understand whether we work with externally passed files;# it is set and affects logic, when copy=Falseself._copied=copiedself._session_ref=session_ref@propertydefpath(self):returnself._file.path@propertydeffile(self):returnself._filedefcopy(self,clean_up=True,copy=True,session_ref=None):returnself.__class__(self.path,clean_up=clean_up,copy=copy,session_ref=session_ref)# pylint: disable=E1123@abstractmethoddefcleanup(self):pass@staticmethoddef_db_in_dbs_case_insensitive(db_id:str,databases:List[str]):fordb_nameindatabases:ifdb_id.upper()==db_name.upper():returnTruereturnFalseclass_CommonBuilder_(ABC):def__init__(self,src=None,clean_up=True,copy=True,session_ref=None):self.src=srcself.clean_up=clean_upself.copy=copyself.session_ref=session_ref@abstractmethoddefbuild(self):pass
[docs]classACL(_FileHandler_):classUser(str):""" Subclass represents an ACL user """passdef__init__(self,path=None,clean_up=True,copy=True,session_ref=None):""" Class representing OneTick database access list file. ACL is the file that describes the list of the users that are allowed to access the database and what permissions do they have. Parameters ---------- path: str A path to custom acl file. Default is `None`, that means to generate a temporary acl file. clean_up: bool If `True`, then temporary acl file will be removed when ACL object will be destroyed. It is helpful for debug purpose. Default is `True`. copy: bool If `True`, then the passed custom acl file by the ``path`` parameter will be copied first before usage. It might be used when you want to work with a custom acl file, but don't want to change the original file; in that case a custom acl file will be copied into a temporary file and every request for modification will be executed for that temporary file. Default is `True`. """copied=True# TODO: implement this logic later# if copy is None and path is not None:# # if copy rule is not specified, but path is specified# # then we set copy to True with safety goal, otherwise# # we would might change a permanent file without user# # acknowledgment# copy = True# raise Warning("You set the ACL file, but have not specify a copy rule. "# "We copy it with safety goal, and it means you will work "# "with copied file instead of passed. If you want to work "# "with passed file directly, then you could set the 'copy' "# "parameter to True.")# if path is set, then copy file, we should not work directly# with externally passed filesifcopy:ifpath:file_h=utils.TmpFile(suffix=".acl",clean_up=clean_up)shutil.copyfile(path,file_h.path)else:file_h=utils.tmp_acl(clean_up=clean_up)else:ifpath:file_h=utils.PermanentFile(path)copied=Falseelse:file_h=utils.tmp_acl(clean_up=clean_up)assertfile_hisnotNonesuper().__init__(file_h,clean_up=clean_up,copied=copied,session_ref=session_ref)self._added_dbs=[]defcleanup(self):self._remove_db(self._added_dbs)self._added_dbs=[]self.reload()def_apply_actions(self,actions,print_writer=False):writer=PrintWriter()ifprint_writerelseFileWriter(self.path)flush=Falseifprint_writerelseTruereturnapply_actions(_acl.parse_acl,FileReader(self.path),writer,actions,flush=flush)def_add_db(self,dbs):actions=[]fordbindbs:actions.append(Add(_acl.DB(id=db.id,read_access="true")))permissions={}ifdb._write:permissions["write_access"]="true"ifhasattr(db,"_destroy_access")anddb._destroy_access:permissions["destroy_access"]="true"ifpermissions:action=Add(_acl.Allow(role="Admin",**permissions))action.add_where(_acl.DB,id=db.id)actions.append(action)returnself._apply_actions(actions)def_remove_db(self,dbs):actions=[]fordbindbs:action=Delete()action.add_where(_acl.DB,id=db.id)actions.append(action)returnself._apply_actions(actions)def_add_user(self,users):actions=[]foruserinusers:action=Add(_acl.User(name=user))action.add_where(_acl.Role,name="Admin")actions.append(action)returnself._apply_actions(actions)def_remove_user(self,users):actions=[]foruserinusers:action=Delete()action.add_where(_acl.Role,name="Admin")action.add_where(_acl.User,name=user)actions.append(action)returnself._apply_actions(actions)defadd(self,*entities):""" Add entities to the ACL and reload it. If it fails, then tries to roll back to the original state. Parameters ---------- entities: DB or ACL.User Raises ------ TypeError, EntityOperationFailed """iflen(entities)==0:raiseValueError("At least one argument in parameter 'entities' is expected.")dbs=[]users=[]forentityinentities:ifisinstance(entity,_db.DB):ifself._db_in_dbs_case_insensitive(entity.id,self.databases):if'//'notinentity.name:warnings.warn(f"Database '{entity.id}' is already added to the ACL"" and will not be rewritten with this command."f" Notice that databases' names are case insensitive.")continuedbs.append(entity)elifisinstance(entity,ACL.User):users.append(entity)else:raiseTypeError(f'Entity of type "{type(entity)}" is not supported')operations=[]ifdbs:operations.append((dbs,self._add_db,self._remove_db))ifusers:operations.append((users,self._add_user,self._remove_user))_apply_to_entities(self,operations)self._added_dbs.extend(dbs)defremove(self,*entities):""" Remove entities from the ACL and reload it. If it fails, then tries to roll back to the original state. Parameters ---------- entities: DB or ACL.User Raises ------ ValueError, TypeError, EntityOperationFailed """iflen(entities)==0:raiseValueError("At least one argument in parameter 'entities' is expected.")dbs=[]users=[]forentityinentities:ifisinstance(entity,_db.DB):ifentitynotinself._added_dbs:raiseValueError(f'DB "{entity}" was not added')dbs.append(entity)elifisinstance(entity,ACL.User):users.append(entity)else:raiseTypeError(f'Entity of type "{type(entity)}" is not supported')operations=[]ifdbs:operations.append((dbs,self._remove_db,self._add_db))ifusers:operations.append((users,self._remove_user,self._add_user))_apply_to_entities(self,operations)fordbindbs:self._added_dbs.remove(db)defreload(self,db=None):ifself._session_refisnotNone:returnutils.reload_config(db,config_type='ACCESS_LIST')def_read_dbs(self):get_db=GetAll()get_db.add_where(_acl.DB)self._apply_actions([get_db],print_writer=True)returnlist(map(lambdax:x.id,get_db.result))def_dbs(self):action=GetAll()action.add_where(_acl.DB)self._apply_actions([action],print_writer=True)returnlist(map(lambdax:x.id,action.result))def_users(self):action=GetAll()action.add_where(_acl.Role,name="Admin")action.add_where(_acl.User)self._apply_actions([action],print_writer=True)returnlist(map(lambdax:x.name,action.result))@propertydefdatabases(self):returnself._dbs()@propertydefusers(self):returnself._users()
classACLBuilder(_CommonBuilder_):defbuild(self):params={"clean_up":self.clean_up,"copy":self.copy,"session_ref":self.session_ref}ifisinstance(self.src,str):returnACL(self.src,**params)elifisinstance(self.src,utils.File):returnACL(self.src.path,**params)elifisinstance(self.src,ACL):returnself.src.copy(**params)elifself.srcisNone:returnACL(**params)raiseValueError(f'It is not allowed to build ACL from the object of type "{type(self.src)}"')
[docs]classLocator(_FileHandler_):def__init__(self,path=None,clean_up=True,copy=True,empty=False,session_ref=None):""" Class representing OneTick database locator. Locator is the file that describes database name, location and other options. Parameters ---------- path: str A path to custom locator file. Default is `None`, that means to generate a temporary locator. clean_up: bool If True, then temporary locator will be removed when Locator object will be destroyed. It is helpful for debug purpose. Default is `True`. copy: bool If `True`, then the passed custom locator by the ``path`` parameter will be copied firstly before usage. It might be used when you want to work with a custom locator, but don't want to change the original file; in that case a custom locator will be copied into a temporary locator and every request for modification will be executed for that temporary locator. Default is `True`. empty: bool If `True`, then a temporary locator will have no databases, otherwise it will have default otp.config.default_db and COMMON databases. Default is `False`. """copied=True# if path is set, then copy file, we should not work directly# with externally passed filesifcopy:ifpath:file_h=utils.TmpFile(".locator",clean_up=clean_up)shutil.copyfile(path,file_h)else:file_h=utils.tmp_locator(clean_up=clean_up,empty=empty)else:ifpath:file_h=utils.PermanentFile(path)copied=Falseelse:file_h=utils.tmp_locator(clean_up=clean_up,empty=empty)assertfile_hisnotNonesuper().__init__(file_h,clean_up=clean_up,copied=copied,session_ref=session_ref)self._added_dbs=[]self._added_ts=[]defcleanup(self):self._remove_db(self._added_dbs)self._remove_ts(str(server)forserverinself._added_ts)self._added_dbs=[]self._added_ts=[]self.reload()@propertydefdatabases(self):returnself._dbs()@propertydeftick_servers(self):returnself._ts()defreload(self,db_=None):ifself._session_refisnotNone:returnutils.reload_config(db_,config_type='LOCATOR')def_apply_actions(self,actions,print_writer=False):writer=PrintWriter()ifprint_writerelseFileWriter(self.path)flush=Falseifprint_writerelseTruereturnapply_actions(_locator.parse_locator,FileReader(self.path),writer,actions,flush=flush)def_dbs(self):action=GetAll()action.add_where(_locator.DB)self._apply_actions([action],print_writer=True)returnlist(map(lambdax:x.id,action.result))def_ts(self):get_ts=GetAll()get_ts.add_where(_locator.TickServers)get_ts.add_where(_locator.ServerLocation)self._apply_actions([get_ts],print_writer=True)return[location.locationforlocationinget_ts.result]def_add_db(self,dbs):actions=[]fordbindbs:actions.append(Add(_locator.DB(id=db.id,**db.properties)))forlocationindb.locations:action=Add(_locator.Location(**location))action.add_where(_locator.DB,id=db.id)actions.append(action)forraw_dbindb.raw_data:common={k:vfork,vinraw_db.items()ifknotin{'id','locations'}}action=Add(_locator.RawDB(id=raw_db['id'],**common))action.add_where(_locator.DB,id=db.id)actions.append(action)forlocationinraw_db['locations']:action=Add(_locator.Location(**location))action.add_where(_locator.DB,id=db.id)action.add_where(_locator.RawDB,id=raw_db['id'])actions.append(action)ifdb.feed:options={k:vfork,vindb.feed.items()ifk!='type'}action=Add(_locator.Feed(type=db.feed['type']))action.add_where(_locator.DB,id=db.id)actions.append(action)action=Add(_locator.FeedOptions(**options))action.add_where(_locator.DB,id=db.id)action.add_where(_locator.Feed,type=db.feed['type'])actions.append(action)returnself._apply_actions(actions)def_remove_db(self,dbs):actions=[]fordbindbs:action=Delete()action.add_where(_locator.DB,id=db.id)actions.append(action)returnself._apply_actions(actions)def_add_ts(self,servers):""" Add servers to locator file (without reloading) Parameters ---------- servers: RemoteTS Servers to be added to locator. """actions=[]forserverinservers:ifserver.cep:actions.append(Add(_locator.CEPServerLocation(location=str(server))))else:actions.append(Add(_locator.ServerLocation(location=str(server))))returnself._apply_actions(actions)def_remove_ts(self,servers):""" Remove servers from locator file (without reloading) Parameters ---------- servers: RemoteTS Servers to remove from locator """actions=[]forserverinservers:action=Delete()action.add_where(_locator.ServerLocation,location=str(server))actions.append(action)returnself._apply_actions(actions)def_add_locator(self,locators):""" Add references to locators Parameters ---------- locators: Locator """actions=[]forlocatorinlocators:actions.append(Add(_locator.Include(path=locator.path)))returnself._apply_actions(actions)def_remove_locator(self,locators):""" Remove references for locators Parameters ---------- locators: Locator """actions=[]forlocatorinlocators:action=Delete()action.add_where(_locator.Include,path=locator.path)actions.append(action)returnself._apply_actions(actions)defadd(self,*entities):""" Add entities to the locator and reload it. If it fails, then tries to roll back to the original state. Parameters ---------- entities: DB, RemoteTS or Locator Raises ------ TypeError, EntityOperationFailed """iflen(entities)==0:raiseValueError("At least one argument in parameter 'entities' is expected.")dbs=[]servers=[]locators=[]forentityinentities:ifisinstance(entity,_db.db._DB):ifself._db_in_dbs_case_insensitive(entity.id,self.databases):if'//'notinentity.name:warnings.warn(f"Database '{entity.id}' is already added to the Locator"" and will not be rewritten with this command."f" Notice that databases' names are case insensitive.")continuedbs.append(entity)elifisinstance(entity,_servers.RemoteTS):servers.append(entity)elifisinstance(entity,Locator):locators.append(entity)else:raiseTypeError(f'Entity of type "{type(entity)}" is not supported')operations=[]ifdbs:operations.append((dbs,self._add_db,self._remove_db))ifservers:operations.append((servers,self._add_ts,self._remove_ts))iflocators:operations.append((locators,self._add_locator,self._remove_locator))_apply_to_entities(self,operations)self._added_dbs.extend(dbs)self._added_ts.extend(servers)defremove(self,*entities):""" Remove entities from the locator and reload it. If it fails, then tries to roll back to the original state. Raises ------ ValueError, TypeError, EntityOperationFailed """iflen(entities)==0:raiseValueError("At least one argument in parameter 'entities' is expected.")dbs=[]servers=[]locators=[]forentityinentities:ifisinstance(entity,_db.db._DB):ifentitynotinself._added_dbs:raiseValueError(f'DB "{entity}" was not added')dbs.append(entity)elifisinstance(entity,_servers.RemoteTS):ifentitynotinself._added_ts:raiseValueError(f'Tick server "{entity}" was not added')servers.append(entity)elifisinstance(entity,Locator):locators.append(entity)else:raiseTypeError(f'Entity of type "{type(entity)}" is not supported')operations=[]ifdbs:operations.append((dbs,self._remove_db,self._add_db))ifservers:operations.append((servers,self._remove_ts,self._add_ts))iflocators:operations.append((locators,self._remove_locator,self._add_locator))_apply_to_entities(self,operations)fordbindbs:self._added_dbs.remove(db)forserverinservers:self._added_ts.remove(server)def__contains__(self,item):ifstr(item)inself.databases:returnTruereturnFalse
classLocatorBuilder(_CommonBuilder_):defbuild(self):params={"clean_up":self.clean_up,"copy":self.copy,"session_ref":self.session_ref}ifisinstance(self.src,str):returnLocator(self.src,**params)elifisinstance(self.src,utils.File):returnLocator(self.src.path,**params)elifisinstance(self.src,Locator):returnself.src.copy(**params)elifisinstance(self.src,_servers.RemoteTS):locator=Locator(empty=True,**params)locator.add(self.src)returnlocatorelifself.srcisNone:returnLocator(**params)raiseValueError(f'It is not allowed to build Locator from the object of type "{type(self.src)}"')
[docs]classConfig(_FileHandler_):_CONFIG_VARIABLES_PASSED_VIA_THEIR_OWN_PARAMETER={'ACCESS_CONTROL_FILE':'acl','DB_LOCATOR.DEFAULT':'locator','OTQ_FILE_PATH':'otq_path','CSV_FILE_PATH':'csv_path','LICENSE_REPOSITORY_DIR':'license','ONE_TICK_LICENSE_FILE':'license',}def__init__(self,config=None,locator=None,acl=None,otq_path=None,csv_path=None,clean_up=True,copy=True,session_ref=None,license=None,variables=None,):""" Parameters ---------- config: path or Config Allows to specify a custom config. None is to use temporary generated config. Default is None. locator: Locator Allows to specify a custom locator file. None is to use temporary generated locator. Default is None. acl: ACL Allows to specify a custom acl file. None is to use temporary generated acl. Default is None. otq_path: list of paths to lookup queries OTQ_PATH parameter in the OneTick config file. Default is None, that is equal to the empty list. csv_path: list of paths to lookup csv files CSV_PATH parameter in the OneTick config file. Default is None, that is equal to the empty list. clean_up: bool If True, then temporary config file will be removed when the Config instance will be destroyed. It is helpful for debug purpose. Default is True. copy: bool If True, then the passed custom config file will be copied firstly before any usage with it. It might be used when you want to work with a custom config file, but don't want to change to change the original file; in that case a custom config will be copied into a temporary config file and every request for modification will be executed for that temporary config. Default is True. license: instance from the onetick.py.license module License to use. If it is not set, then onetick.py.license.Default is used. variables: dict Other values to pass to config. """ifconfigand(locatororacl):raiseValueError("It is not allowed to use 'config' parameter along with 'locator' or 'acl'")# builders that construct locator and acl based on parametersacl_builder=ACLBuilder(src=acl,clean_up=clean_up,copy=copy,session_ref=session_ref)locator_builder=LocatorBuilder(src=locator,clean_up=clean_up,copy=copy,session_ref=session_ref)config_copied=Trueifconfig:# copy passed file, we should not work with externally passed filesifcopy:self._file=utils.TmpFile(".cfg",clean_up=clean_up)config_path=config.pathifisinstance(config,Config)elseconfigshutil.copyfile(config_path,self._file.path)else:self._file=utils.PermanentFile(config)config_copied=Falseifutils.is_param_in_config(self._file.path,"ACCESS_CONTROL_FILE"):acl_builder.src=utils.get_config_param(self._file.path,"ACCESS_CONTROL_FILE")ifutils.is_param_in_config(self._file.path,"DB_LOCATOR.DEFAULT"):locator_builder.src=utils.get_config_param(self._file.path,"DB_LOCATOR.DEFAULT")else:self._file=utils.tmp_config(clean_up=clean_up)self._acl=acl_builder.build()os.environ["ONE_TICK_SESSION_ACL_PATH"]=self._acl.pathself._locator=locator_builder.build()os.environ["ONE_TICK_SESSION_LOCATOR_PATH"]=self._locator.pathsuper().__init__(self._file,clean_up=clean_up,copied=config_copied,session_ref=session_ref)# Here we can start to modify files - they are either copied or generated# ------------------------------------------------------------------------utils.modify_config_param(self.path,"ACCESS_CONTROL_FILE",self._acl.path,throw_on_missing=False)utils.modify_config_param(self.path,"DB_LOCATOR.DEFAULT",self._locator.path,throw_on_missing=False)ifotq_path:otq_path=map(str,otq_path)utils.modify_config_param(self.path,"OTQ_FILE_PATH",",".join(otq_path),throw_on_missing=False)ifcsv_path:csv_path=map(str,csv_path)utils.modify_config_param(self.path,"CSV_FILE_PATH",",".join(csv_path),throw_on_missing=False)variables=variablesor{}forparameter_name,parameter_valueinvariables.items():ifparameter_nameinself._CONFIG_VARIABLES_PASSED_VIA_THEIR_OWN_PARAMETER:raiseValueError(f'Variable {parameter_name} should be set via 'f'{self._CONFIG_VARIABLES_PASSED_VIA_THEIR_OWN_PARAMETER[parameter_name]} parameter')ifisinstance(parameter_value,list):parameter_value=",".join(map(str,parameter_value))utils.modify_config_param(self.path,parameter_name,parameter_value,throw_on_missing=False)# set license# ---------------------------custom_license=utils.is_param_in_config(self.path,"LICENSE_REPOSITORY_DIR")custom_license&=utils.is_param_in_config(self.path,"ONE_TICK_LICENSE_FILE")iflicense:self._license=licenseelse:ifcustom_license:lic_file=utils.get_config_param(self.path,"ONE_TICK_LICENSE_FILE")lic_dir=utils.get_config_param(self.path,"LICENSE_REPOSITORY_DIR")self._license=_license.Custom(lic_file,lic_dir)else:ifisinstance(locator,_servers.RemoteTS):self._license=_license.Remote()else:self._license=_license.Default()ifnotcustom_license:# no need to set already defined custom valuesifself._license.dir:utils.modify_config_param(self.path,"LICENSE_REPOSITORY_DIR",self._license.dir,throw_on_missing=False)ifself._license.file:utils.modify_config_param(self.path,"ONE_TICK_LICENSE_FILE",self._license.file,throw_on_missing=False)@propertydefacl(self):returnself._acl@propertydeflocator(self):returnself._locator@propertydeflicense(self):returnself._licensedefcopy(self,clean_up=True,copy=True,session_ref=None):""" overridden version of copy """returnself.__class__(self.path,clean_up=clean_up,copy=copy,session_ref=session_ref,license=self._license)@staticmethoddefbuild(obj=None,clean_up=True,copy=True,session_ref=None):params={"clean_up":clean_up,"copy":copy,"session_ref":session_ref}ifisinstance(obj,str):returnConfig(obj,**params)elifisinstance(obj,utils.File):returnConfig(obj.path,**params)elifisinstance(obj,Config):returnobj.copy(**params)elifobjisNone:returnConfig(**params)raiseValueError(f'It is not allowed to build Config from the object of type "{type(obj)}"')defcleanup(self):# no logic to clean up contentself._acl.cleanup()self._locator.cleanup()@propertydefotq_path(self):try:returnutils.get_config_param(self.path,"OTQ_FILE_PATH")exceptAttributeError:returnNone
[docs]classSession(object):""" A class for setting up working OneTick session. It keeps configuration files during the session and allows to manage them. When instance is out of scope, then instance cleans up config files and configuration. You can leave the scope manually with method :py:meth:`close`. Also, session is closed automatically if this object is used as a context manager. It is allowed to have only one alive session instance in the process. If you don't use Session's instance, then ``ONE_TICK_CONFIG`` environment variable should be set to be able to work with OneTick. If config file is not set then temporary is generated. Config includes locator and acl file, and if they are not set, then they are generated. Parameters ---------- config : str, :py:class:`onetick.py.session.Config`, optional Path to an existing OneTick config file; if it is not set, then config will be generated. If config is not set, then temporary config is generated. Default is None. clean_up : bool, optional A flag to control cleaning up process: if it is True then all temporary generated files will be automatically removed. It is helpful for debugging. The flag affects only generated files, but does not externally passed. Default is True. copy : bool, optional A flag to control file copy process: if it is True then all externally passed files will be copied before usage, otherwise all modifications during an existing session happen directly with passed config files. NOTE: we suggest to set this flag only when you fully understand it's effect. Default is True. override_env : bool, optional If flag is True, then unconditionally ``ONE_TICK_CONFIG`` environment variable will be overridden with a config that belongs to a Session. Otherwise ``ONE_TICK_CONFIG`` will be defined in the scope of session only when it is not defined externally. For example, it is helpful when you test ascii_loader that uses 'ONE_TICK_CONFIG' only. Default is False ( default is False, because overriding external environment variable might be not obvious and desirable ) redirect_logs: bool, optional If flag is True, then OneTick logs will be redirected into a temporary log file. Otherwise logs will be mixed with output. Default is True. Examples -------- If session is defined with environment, OneTick can be used right away: >>> 'ONE_TICK_CONFIG' in os.environ True >>> list(otp.databases()) # doctest: +ELLIPSIS ['COMMON', 'DEMO_L1', ..., 'SOME_DB'] >>> data = otp.DataSource('SOME_DB', symbol='S1', tick_type='TT') >>> otp.run(data) Time X 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 2 2 2003-12-01 00:00:00.002 3 """# TODO: create article for Session in Guides or Concepts_instance=Nonedef__init__(self,config=None,clean_up=True,copy=True,override_env=False,redirect_logs=True):self._construct(config,clean_up,copy,override_env,redirect_logs)def_construct(self,config=None,clean_up=True,copy=True,override_env=False,redirect_logs=True):ifSession._instance:raiseMultipleSessionsException("It is forbidden to use multiple active sessions simultaniously in one process")defonetick_cfg_rollback(var):""" function to rollback ONE_TICK_CONFIG state """def_impl():ifvarisNone:if"ONE_TICK_CONFIG"inos.environ:delos.environ["ONE_TICK_CONFIG"]else:os.environ["ONE_TICK_CONFIG"]=varreturn_implself._lib=Noneself._env_rollback=onetick_cfg_rollback(os.environ.get("ONE_TICK_CONFIG",None))self._override_env=override_envself._config=Config.build(config,clean_up=clean_up,copy=copy,session_ref=self)os.environ["ONE_TICK_SESSION_CFG_PATH"]=self._config.pathtry:if"ONE_TICK_CONFIG"notinos.environ:os.environ["ONE_TICK_CONFIG"]=self._config.pathelse:ifoverride_env:os.environ["ONE_TICK_CONFIG"]=self._config.pathelse:warnings.warn(UserWarning("ONE_TICK_CONFIG env variable has been set before a session, ""and in the session scope it is not related to the session config. ""If you want to make ONE_TICKC_CONFIG env variable be consistent ""with the session, then look at the override_env flag ""for the Session constructor"))OneTickLib().cleanup()self._log_file=utils.TmpFile(suffix=".onetick.log",clean_up=clean_up)self._lib=OneTickLib(self._config.path,log_file=self._log_file.path)except:# noqaself._env_rollback()# TODO: rollback, but need to wait BDS-91raiseSession._instance=selfself._ts_dbs={}
[docs]defuse(self,*items):""" Makes DB or TS available inside the session. Parameters ---------- items : :py:class:`~onetick.py.DB` or :py:class:`~onetick.py.servers.RemoteTS` objects Items to be added to session. Examples -------- (note that ``session`` is created before this example) >>> list(otp.databases()) # doctest: +ELLIPSIS ['COMMON', 'DEMO_L1', ..., 'SOME_DB'] >>> new_db = otp.DB('ZZZZ') >>> session.use(new_db) >>> list(otp.databases()) # doctest: +ELLIPSIS ['COMMON', 'DEMO_L1', ..., 'SOME_DB', 'ZZZZ'] """self.locator.add(*items)dbs=[]foriteminitems:ifisinstance(item,_db.db._DB):dbs.append(item)try:ifdbs:self.acl.add(*dbs)exceptException:self.locator.remove(*items)raise
[docs]defuse_stub(self,stub_name):""" Adds stub-DB into the session. The shortcut for ``.use(otp.DB(stub_name))`` Parameters ---------- stub_name : str name of the stub """returnself.use(_db.DB(stub_name))
[docs]defclose(self):""" Close session """ifSession._instance==self:try:ifself._config:delself._configself._config=Nonefinally:ifself._lib:self._lib.cleanup()self._lib=Noneself._env_rollback()# del os.environ['ONE_TICK_SESSION_CFG_PATH']# del os.environ['ONE_TICK_SESSION_ACL_PATH']# del os.environ['ONE_TICK_SESSION_LOCATOR_PATH']Session._instance=None
def__del__(self):self.close()@propertydefconfig(self):""" A reference to the underlying Config object that represents OneTick config file. Returns ------- :py:class:`onetick.py.session.Config` """returnself._config@config.setterdefconfig(self,cfg):self.close()self._construct(cfg,override_env=self._override_env)@propertydefacl(self):""" A reference to the underlying ACL object that represents OneTick access control list file. Returns ------- :py:class:`onetick.py.session.ACL` """returnself._config.acl@propertydeflocator(self):""" A reference to the underlying Locator that represents OneTick locator file. Returns ------- :py:class:`onetick.py.session.Locator` """returnself._config.locator@propertydeflicense(self):returnself._config.license@propertydefts_databases(self):self._get_ts_dbs()returnself._ts_dbs@propertydefdatabases(self):returnself._available_dbs()
[docs]classTestSession(Session):def__init__(self,*args,**kwargs):""" This class does the same as :py:class:`onetick.py.session.Session` but also define default values. """config['tz']='EST5EDT'config['default_db']='DEMO_L1'config['default_symbol']='AAPL'config['default_start_time']=datetime(2003,12,1,0,0,0)config['default_end_time']=datetime(2003,12,4,0,0,0)super().__init__(*args,**kwargs)