from onetick.py import types as ott
from onetick.py.core._internal._state_objects import (
_StateColumn, TickList, TickSet, TickSetUnordered, TickDeque,
_TickListTick, _TickSetTick, _TickDequeTick, _DynamicTick,
)
AVAILABLE_SCOPES = (
"BRANCH", # A branch-scope state variable is visible to all event processors of the branch its declarator EP
# belongs to (a branch is a maximal long chain of EPs, each node of which has at most 1 _source and at most 1 sink).
"ALL_INPUTS", # An all-inputs-scope state variable is visible to all event processors of the input subgraph of its
# declarator EP (input subgraph of a particular node is the subset of nodes directly or indirectly feeding it).
"ALL_OUTPUTS", # An all-outputs-scope state variable is visible to all event processors of the output subgraph of
# its declarator EP (output subgraph of a particular node is the subset of nodes directly or indirectly fed by it).
"QUERY", # A query scope state variable is visible to all event processors of the graph.
"CROSS_SYMBOL", # A cross-symbol scope state variable is visible to all event processors across all unbound
# symbols. Cross-symbol scope state variables cannot be modified after initialization.
)
[docs]def var(default_value, scope="query"):
"""
Defines a state variable. Supports int, float and string values.
Parameters
----------
default_value: any
Default value of the state variable
scope: str
Scope for the state variable. Possible values are: query, branch, cross_symbol.
Default: query
Returns
-------
a state variable that should be assigned to a _source
Examples
--------
>>> data = otp.Ticks(dict(X=[0, 1, 2]))
>>> data.state_vars['SUM'] = otp.state.var(0)
>>> data.state_vars['SUM'] += data['X']
>>> data['SUM'] = data.state_vars['SUM']
>>> otp.run(data)[['X', 'SUM']]
X SUM
0 0 0
1 1 1
2 2 3
"""
scope = _validate_and_preprocess_scope(scope)
dtype = ott.get_object_type(default_value)
if dtype is ott.msectime:
raise TypeError("State variables do not support msectime type")
# elif dtype is ott.nsectime:
# raise TypeError("State variables do not support nsectime type")
elif ott.is_time_type(dtype):
dtype = ott.nsectime
if type(default_value) is str:
if len(default_value) > ott.string.DEFAULT_LENGTH:
dtype = ott.string[len(default_value)]
res = _StateColumn("", dtype, obj_ref=None, default_value=default_value, scope=scope)
return res
[docs]def tick_list(default_value=None, scope='query') -> TickList:
"""
Defines a state tick list.
Parameters
----------
default_value: :py:func:`eval query <onetick.py.eval>`
Eval query to initialize tick list from.
scope: str
Scope for the state variable.
Possible values are: query, branch, cross_symbol, all_inputs, all_outputs
Examples
--------
>>> def fsq():
... return otp.Ticks(B=[1, 2, 3])
>>> data = otp.Tick(A=1)
>>> data.state_vars['LIST'] = otp.state.tick_list(otp.eval(fsq))
>>> data = data.state_vars['LIST'].dump()
>>> otp.run(data)[['B']]
B
0 1
1 2
2 3
"""
scope = _validate_and_preprocess_scope(scope)
return TickList('', obj_ref=None, default_value=default_value, scope=scope)
[docs]def tick_set(insertion_policy, key_fields, default_value=None, scope='query') -> TickSet:
"""
Defines a state tick set.
Parameters
----------
insertion_policy: 'oldest' or 'latest'
'oldest' specifies not to overwrite ticks with the same keys.
'latest' makes the last inserted tick overwrite the one with the same keys (if existing).
key_fields: str, list of str
default_value: :py:func:`eval query <onetick.py.eval>`
Eval query to initialize tick set from.
scope: str
Scope for the state variable.
Possible values are: query, branch, cross_symbol, all_inputs, all_outputs
Examples
--------
>>> def fsq():
... return otp.Ticks(B=[1, 1, 2, 2, 3, 3])
>>> data = otp.Tick(A=1)
>>> data.state_vars['SET'] = otp.state.tick_set('oldest', 'B', otp.eval(fsq))
>>> data = data.state_vars['SET'].dump()
>>> otp.run(data)[['B']]
B
0 1
1 2
2 3
"""
scope = _validate_and_preprocess_scope(scope)
return TickSet('',
insertion_policy=insertion_policy, key_fields=key_fields,
obj_ref=None, default_value=default_value, scope=scope)
[docs]def tick_set_unordered(insertion_policy,
key_fields,
default_value=None,
max_distinct_keys=-1,
scope='query') -> TickSetUnordered:
"""
Defines unordered tick set.
Parameters
----------
insertion_policy: 'oldest' or 'latest'
'oldest' specifies not to overwrite ticks with the same keys.
'latest' makes the last inserted tick overwrite the one with the same keys (if existing).
key_fields: str, list of str
max_distinct_keys: int
maximum expected size of the set or -1, if the number of ticks is not known.
default_value: :py:func:`eval query <onetick.py.eval>`
Eval query to initialize unordered tick set from.
scope: str,
Scope for the state variable.
Possible values are: query, branch, cross_symbol, all_inputs, all_outputs
Examples
--------
>>> def fsq():
... return otp.Ticks(B=[1, 1, 2, 2, 3, 3])
>>> data = otp.Tick(A=1)
>>> data.state_vars['SET'] = otp.state.tick_set_unordered('oldest', 'B', otp.eval(fsq))
>>> data = data.state_vars['SET'].dump()
>>> otp.run(data)[['B']]
B
0 1
1 2
2 3
"""
scope = _validate_and_preprocess_scope(scope)
return TickSetUnordered(
'',
insertion_policy=insertion_policy, max_distinct_keys=max_distinct_keys, key_fields=key_fields,
obj_ref=None, default_value=default_value, scope=scope
)
[docs]def tick_deque(default_value=None, scope='query') -> TickDeque:
"""
Defines a state tick deque.
Parameters
----------
default_value: :py:func:`eval query <onetick.py.eval>`
Eval query to initialize tick deque from.
scope: str
Scope for the state variable.
Possible values are: query, branch, cross_symbol, all_inputs, all_outputs
Examples
--------
>>> def fsq():
... return otp.Ticks(B=[1, 2, 3])
>>> data = otp.Tick(A=1)
>>> data.state_vars['DEQUE'] = otp.state.tick_deque(otp.eval(fsq))
>>> data = data.state_vars['DEQUE'].dump()
>>> otp.run(data)[['B']]
B
0 1
1 2
2 3
"""
scope = _validate_and_preprocess_scope(scope)
return TickDeque('', obj_ref=None, default_value=default_value, scope=scope)
def _validate_and_preprocess_scope(scope):
if isinstance(scope, str):
scope = scope.upper()
if scope not in AVAILABLE_SCOPES:
raise ValueError(f"unknown scope {scope}, please use one of {AVAILABLE_SCOPES}")
else:
raise ValueError(f"scope should be one of the following strings: {AVAILABLE_SCOPES}")
return scope