fromonetick.pyimporttypesasottfromonetick.py.core._internal._state_objectsimport(_StateColumn,TickList,TickSet,TickSetUnordered,TickDeque,_TickListTick,_TickSetTick,_TickDequeTick,_DynamicTick,)AVAILABLE_SCOPES=("BRANCH",# A branch-scope state variable is visible to all event processors of the branch its declarator EP# belongs to (a branch is a maximal long chain of EPs, each node of which has at most 1 _source and at most 1 sink)."ALL_INPUTS",# An all-inputs-scope state variable is visible to all event processors of the input subgraph of its# declarator EP (input subgraph of a particular node is the subset of nodes directly or indirectly feeding it)."ALL_OUTPUTS",# An all-outputs-scope state variable is visible to all event processors of the output subgraph of# its declarator EP (output subgraph of a particular node is the subset of nodes directly or indirectly fed by it)."QUERY",# A query scope state variable is visible to all event processors of the graph."CROSS_SYMBOL",# A cross-symbol scope state variable is visible to all event processors across all unbound# symbols. Cross-symbol scope state variables cannot be modified after initialization.)
[docs]defvar(default_value,scope="query"):""" Defines a state variable. Supports int, float and string values. Parameters ---------- default_value: any Default value of the state variable scope: str Scope for the state variable. Possible values are: query, branch, cross_symbol. Default: query Returns ------- a state variable that should be assigned to a _source Examples -------- >>> data = otp.Ticks(dict(X=[0, 1, 2])) >>> data.state_vars['SUM'] = otp.state.var(0) >>> data.state_vars['SUM'] += data['X'] >>> data['SUM'] = data.state_vars['SUM'] >>> otp.run(data)[['X', 'SUM']] X SUM 0 0 0 1 1 1 2 2 3 """scope=_validate_and_preprocess_scope(scope)dtype=ott.get_object_type(default_value)ifdtypeisott.msectime:raiseTypeError("State variables do not support msectime type")# elif dtype is ott.nsectime:# raise TypeError("State variables do not support nsectime type")elifott.is_time_type(dtype):dtype=ott.nsectimeiftype(default_value)isstr:iflen(default_value)>ott.string.DEFAULT_LENGTH:dtype=ott.string[len(default_value)]res=_StateColumn("",dtype,obj_ref=None,default_value=default_value,scope=scope)returnres
[docs]deftick_list(default_value=None,scope='query')->TickList:""" Defines a state tick list. Parameters ---------- default_value: :py:func:`eval query <onetick.py.eval>` Evaluated query to initialize tick list from. scope: str Scope for the state variable. Possible values are: query, branch, cross_symbol, all_inputs, all_outputs Examples -------- >>> def fsq(): ... return otp.Ticks(B=[1, 2, 3]) >>> data = otp.Tick(A=1) >>> data.state_vars['LIST'] = otp.state.tick_list(otp.eval(fsq)) >>> data = data.state_vars['LIST'].dump() >>> otp.run(data)[['B']] B 0 1 1 2 2 3 """scope=_validate_and_preprocess_scope(scope)returnTickList('',obj_ref=None,default_value=default_value,scope=scope)
[docs]deftick_set(insertion_policy,key_fields,default_value=None,scope='query')->TickSet:""" Defines a state tick set. Parameters ---------- insertion_policy: 'oldest' or 'latest' 'oldest' specifies not to overwrite ticks with the same keys. 'latest' makes the last inserted tick overwrite the one with the same keys (if existing). key_fields: str, list of str The values of the specified fields will be used as keys. default_value: :py:func:`eval query <onetick.py.eval>` Evaluated query to initialize tick set from. scope: str Scope for the state variable. Possible values are: query, branch, cross_symbol, all_inputs, all_outputs Examples -------- >>> def fsq(): ... return otp.Ticks(B=[1, 1, 2, 2, 3, 3]) >>> data = otp.Tick(A=1) >>> data.state_vars['SET'] = otp.state.tick_set('oldest', 'B', otp.eval(fsq)) >>> data = data.state_vars['SET'].dump() >>> otp.run(data)[['B']] B 0 1 1 2 2 3 """scope=_validate_and_preprocess_scope(scope)returnTickSet('',insertion_policy=insertion_policy,key_fields=key_fields,obj_ref=None,default_value=default_value,scope=scope)
[docs]deftick_set_unordered(insertion_policy,key_fields,default_value=None,max_distinct_keys=-1,scope='query')->TickSetUnordered:""" Defines unordered tick set. Parameters ---------- insertion_policy: 'oldest' or 'latest' 'oldest' specifies not to overwrite ticks with the same keys. 'latest' makes the last inserted tick overwrite the one with the same keys (if existing). key_fields: str, list of str max_distinct_keys: int maximum expected size of the set or -1, if the number of ticks is not known. default_value: :py:func:`eval query <onetick.py.eval>` Evaluated query to initialize unordered tick set from. scope: str, Scope for the state variable. Possible values are: query, branch, cross_symbol, all_inputs, all_outputs Examples -------- >>> def fsq(): ... return otp.Ticks(B=[1, 1, 2, 2, 3, 3]) >>> data = otp.Tick(A=1) >>> data.state_vars['SET'] = otp.state.tick_set_unordered('oldest', 'B', otp.eval(fsq)) >>> data = data.state_vars['SET'].dump() >>> otp.run(data)[['B']] B 0 1 1 2 2 3 """scope=_validate_and_preprocess_scope(scope)returnTickSetUnordered('',insertion_policy=insertion_policy,max_distinct_keys=max_distinct_keys,key_fields=key_fields,obj_ref=None,default_value=default_value,scope=scope)
[docs]deftick_deque(default_value=None,scope='query')->TickDeque:""" Defines a state tick deque. Parameters ---------- default_value: :py:func:`eval query <onetick.py.eval>` Evaluated query to initialize tick deque from. scope: str Scope for the state variable. Possible values are: query, branch, cross_symbol, all_inputs, all_outputs Examples -------- >>> def fsq(): ... return otp.Ticks(B=[1, 2, 3]) >>> data = otp.Tick(A=1) >>> data.state_vars['DEQUE'] = otp.state.tick_deque(otp.eval(fsq)) >>> data = data.state_vars['DEQUE'].dump() >>> otp.run(data)[['B']] B 0 1 1 2 2 3 """scope=_validate_and_preprocess_scope(scope)returnTickDeque('',obj_ref=None,default_value=default_value,scope=scope)
def_validate_and_preprocess_scope(scope):ifisinstance(scope,str):scope=scope.upper()ifscopenotinAVAILABLE_SCOPES:raiseValueError(f"unknown scope {scope}, please use one of {AVAILABLE_SCOPES}")else:raiseValueError(f"scope should be one of the following strings: {AVAILABLE_SCOPES}")returnscope