otp.Source.agg#
- Source.agg(aggs, running=False, all_fields=False, bucket_interval=0, bucket_time='end', bucket_units='seconds', bucket_end_condition=None, end_condition_per_group=False, boundary_tick_bucket='new', group_by=None)[source]#
 Applies composition of otp.agg aggregations
- Parameters
 aggs (dict of aggregations) – aggregation dict: key - output column name; value - aggregation
running (bool) –
Aggregation will be calculated as sliding window.
runningandbucket_intervalparameters determines when new buckets are created.running= Trueaggregation will be calculated in a sliding window.
bucket_interval= N (N > 0)Window size will be N. Output tick wil be generated when tick “enter” window (arrival event) and when “exit” window (exit event)
bucket_interval= 0Left boundary of window will be binded to start time. For each tick aggregation will be calculated in [start_time; tick_t].
running= Falsebuckets partition the [query start time, query end time) interval into non-overlapping intervals of size
bucket_interval(with the last interval possibly of a smaller size). Ifbucket_intervalis set to 0 a single bucket for the entire interval is created.
Default: False - create totally independent buckets. Number of buckets = (end - start) / bucket_interval’)
all_fields (Literal[True, False, 'first', 'last', 'high', 'low']) –
If
all_fieldsFalse - output tick will have only aggregation fields.If
all_fieldsTrue - also include all fields from the input ticks by using values from first_tick in bucket.If
all_fieldsset to “first”, “last”, “high”, or “low” - explicitly set tick selection policy for all fields values.
bucket_interval (int) – Determines the length of each bucket (units depends on
bucket_units).bucket_time (Literal['start', 'end']) –
Control output timestamp.
start
the timestamp assigned to the bucket is the start time of the bucket.
end
the timestamp assigned to the bucket is the end time of the bucket.
bucket_units (Literal['seconds', 'ticks', 'days', 'months', 'flexible']) –
Set bucket interval units.
If set to flexible
bucket_end_criteriamust be set.bucket_end_condition (condition) – An expression that is evaluated on every tick. If it evaluates to “True”, then a new bucket is created. This parameter is only used if
bucket_unitsis set to “flexible”end_condition_per_group (bool) –
Controls application of
bucket_end_conditionin groups.end_condition_per_group= Truebucket_end_conditionis applied only to the group defined bygroup_byend_condition_per_group= Falsebucket_end_conditionapplied across all groups
This parameter is only used if
bucket_unitsis set to “flexible”boundary_tick_bucket (Literal['new', 'previous']) –
Controls boundary tick ownership.
previous
A tick on which
bucket_end_conditionevaluates to “true” belongs to the bucket being closed.new
tick belongs to the new bucket.
This parameter is only used if
bucket_unitsis set to “flexible”group_by (list, str or expression) – When specified, each bucket is broken further into additional sub-buckets based on specified field values.
- Return type
 
Examples
By default the whole data is aggregated:
>>> data = otp.Ticks(X=[1, 2, 3, 4]) >>> data = data.agg({'X_SUM': otp.agg.sum('X')}) >>> otp.run(data) Time X_SUM 0 2003-12-04 10
Multiple aggregations can be applied at the same time:
>>> data = otp.Ticks(X=[1, 2, 3, 4]) >>> data = data.agg({'X_SUM': otp.agg.sum('X'), ... 'X_MEAN': otp.agg.average('X')}) >>> otp.run(data) Time X_SUM X_MEAN 0 2003-12-04 10 2.5
Aggregation can be used in running mode:
>>> data = otp.Ticks(X=[1, 2, 3, 4]) >>> data = data.agg({'CUM_SUM': otp.agg.sum('X')}, running=True) >>> otp.run(data) Time CUM_SUM 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 3 2 2003-12-01 00:00:00.002 6 3 2003-12-01 00:00:00.003 10
Aggregation can be split in buckets:
>>> data = otp.Ticks(X=[1, 2, 3, 4]) >>> data = data.agg({'X_SUM': otp.agg.sum('X')}, bucket_interval=2, bucket_units='ticks') >>> otp.run(data) Time X_SUM 0 2003-12-01 00:00:00.001 3 1 2003-12-01 00:00:00.003 7
Running aggregation can be used with buckets too:
>>> data = otp.Ticks(X=[1, 2, 3, 4], offsets=[0, 1000, 1500, 3000]) >>> data = data.agg(dict(X_MEAN=otp.agg.average("X"), ... X_STD=otp.agg.stddev("X")), ... running=True, bucket_interval=2) >>> otp.run(data) Time X_MEAN X_STD 0 2003-12-01 00:00:00.000 1.0 0.000000 1 2003-12-01 00:00:00.001 1.5 0.500000 2 2003-12-01 00:00:00.002 2.0 0.816497 3 2003-12-01 00:00:00.003 2.5 1.118034 4 2003-12-01 00:00:02.000 3.0 0.816497 5 2003-12-01 00:00:02.001 3.5 0.500000 6 2003-12-01 00:00:02.002 4.0 0.000000 7 2003-12-01 00:00:02.003 NaN NaN
See also
COMPUTE OneTick event processor