Source code for onetick.py.state

from onetick.py import types as ott
from onetick.py.core._internal._state_objects import (
    _StateColumn, TickList, TickSet, TickSetUnordered, TickDeque,
    _TickListTick, _TickSetTick, _TickDequeTick, _DynamicTick,
)


AVAILABLE_SCOPES = (
    "BRANCH",  # A branch-scope state variable is visible to all event processors of the branch its declarator EP
    # belongs to (a branch is a maximal long chain of EPs, each node of which has at most 1 _source and at most 1 sink).
    "ALL_INPUTS",  # An all-inputs-scope state variable is visible to all event processors of the input subgraph of its
    # declarator EP (input subgraph of a particular node is the subset of nodes directly or indirectly feeding it).
    "ALL_OUTPUTS",  # An all-outputs-scope state variable is visible to all event processors of the output subgraph of
    # its declarator EP (output subgraph of a particular node is the subset of nodes directly or indirectly fed by it).
    "QUERY",  # A query scope state variable is visible to all event processors of the graph.
    "CROSS_SYMBOL",  # A cross-symbol scope state variable is visible to all event processors across all unbound
    # symbols. Cross-symbol scope state variables cannot be modified after initialization.
)


[docs]def var(default_value, scope="query"): """ Defines a state variable. Supports int, float and string values. Parameters ---------- default_value: any Default value of the state variable scope: str Scope for the state variable. Possible values are: query, branch, cross_symbol. Default: query Returns ------- a state variable that should be assigned to a _source Examples -------- >>> data = otp.Ticks(dict(X=[0, 1, 2])) >>> data.state_vars['SUM'] = otp.state.var(0) >>> data.state_vars['SUM'] += data['X'] >>> data['SUM'] = data.state_vars['SUM'] >>> otp.run(data)[['X', 'SUM']] X SUM 0 0 0 1 1 1 2 2 3 """ scope = _validate_and_preprocess_scope(scope) dtype = ott.get_object_type(default_value) if dtype is ott.msectime: raise TypeError("State variables do not support msectime type") # elif dtype is ott.nsectime: # raise TypeError("State variables do not support nsectime type") elif ott.is_time_type(dtype): dtype = ott.nsectime if type(default_value) is str: if len(default_value) > ott.string.DEFAULT_LENGTH: dtype = ott.string[len(default_value)] res = _StateColumn("", dtype, obj_ref=None, default_value=default_value, scope=scope) return res
[docs]def tick_list(default_value=None, scope='query') -> TickList: """ Defines a state tick list. Parameters ---------- default_value: :py:func:`eval query <onetick.py.eval>` Evaluated query to initialize tick list from. scope: str Scope for the state variable. Possible values are: query, branch, cross_symbol, all_inputs, all_outputs Examples -------- >>> def fsq(): ... return otp.Ticks(B=[1, 2, 3]) >>> data = otp.Tick(A=1) >>> data.state_vars['LIST'] = otp.state.tick_list(otp.eval(fsq)) >>> data = data.state_vars['LIST'].dump() >>> otp.run(data)[['B']] B 0 1 1 2 2 3 """ scope = _validate_and_preprocess_scope(scope) return TickList('', obj_ref=None, default_value=default_value, scope=scope)
[docs]def tick_set(insertion_policy, key_fields, default_value=None, scope='query') -> TickSet: """ Defines a state tick set. Parameters ---------- insertion_policy: 'oldest' or 'latest' 'oldest' specifies not to overwrite ticks with the same keys. 'latest' makes the last inserted tick overwrite the one with the same keys (if existing). key_fields: str, list of str The values of the specified fields will be used as keys. default_value: :py:func:`eval query <onetick.py.eval>` Evaluated query to initialize tick set from. scope: str Scope for the state variable. Possible values are: query, branch, cross_symbol, all_inputs, all_outputs Examples -------- >>> def fsq(): ... return otp.Ticks(B=[1, 1, 2, 2, 3, 3]) >>> data = otp.Tick(A=1) >>> data.state_vars['SET'] = otp.state.tick_set('oldest', 'B', otp.eval(fsq)) >>> data = data.state_vars['SET'].dump() >>> otp.run(data)[['B']] B 0 1 1 2 2 3 """ scope = _validate_and_preprocess_scope(scope) return TickSet('', insertion_policy=insertion_policy, key_fields=key_fields, obj_ref=None, default_value=default_value, scope=scope)
[docs]def tick_set_unordered(insertion_policy, key_fields, default_value=None, max_distinct_keys=-1, scope='query') -> TickSetUnordered: """ Defines unordered tick set. Parameters ---------- insertion_policy: 'oldest' or 'latest' 'oldest' specifies not to overwrite ticks with the same keys. 'latest' makes the last inserted tick overwrite the one with the same keys (if existing). key_fields: str, list of str max_distinct_keys: int maximum expected size of the set or -1, if the number of ticks is not known. default_value: :py:func:`eval query <onetick.py.eval>` Evaluated query to initialize unordered tick set from. scope: str, Scope for the state variable. Possible values are: query, branch, cross_symbol, all_inputs, all_outputs Examples -------- >>> def fsq(): ... return otp.Ticks(B=[1, 1, 2, 2, 3, 3]) >>> data = otp.Tick(A=1) >>> data.state_vars['SET'] = otp.state.tick_set_unordered('oldest', 'B', otp.eval(fsq)) >>> data = data.state_vars['SET'].dump() >>> otp.run(data)[['B']] B 0 1 1 2 2 3 """ scope = _validate_and_preprocess_scope(scope) return TickSetUnordered( '', insertion_policy=insertion_policy, max_distinct_keys=max_distinct_keys, key_fields=key_fields, obj_ref=None, default_value=default_value, scope=scope )
[docs]def tick_deque(default_value=None, scope='query') -> TickDeque: """ Defines a state tick deque. Parameters ---------- default_value: :py:func:`eval query <onetick.py.eval>` Evaluated query to initialize tick deque from. scope: str Scope for the state variable. Possible values are: query, branch, cross_symbol, all_inputs, all_outputs Examples -------- >>> def fsq(): ... return otp.Ticks(B=[1, 2, 3]) >>> data = otp.Tick(A=1) >>> data.state_vars['DEQUE'] = otp.state.tick_deque(otp.eval(fsq)) >>> data = data.state_vars['DEQUE'].dump() >>> otp.run(data)[['B']] B 0 1 1 2 2 3 """ scope = _validate_and_preprocess_scope(scope) return TickDeque('', obj_ref=None, default_value=default_value, scope=scope)
def _validate_and_preprocess_scope(scope): if isinstance(scope, str): scope = scope.upper() if scope not in AVAILABLE_SCOPES: raise ValueError(f"unknown scope {scope}, please use one of {AVAILABLE_SCOPES}") else: raise ValueError(f"scope should be one of the following strings: {AVAILABLE_SCOPES}") return scope