otp.Source.agg#

Source.agg(aggs, running=False, all_fields=False, bucket_interval=0, bucket_time='end', bucket_units='seconds', bucket_end_condition=None, end_condition_per_group=False, boundary_tick_bucket='new', group_by=None)[source]#

Applies composition of otp.agg aggregations

Parameters
  • aggs (dict of aggregations) – aggregation dict: key - output column name; value - aggregation

  • running (bool) –

    Aggregation will be calculated as sliding window. running and bucket_interval parameters determines when new buckets are created.

    • running = True

      aggregation will be calculated in a sliding window.

      • bucket_interval = N (N > 0)

        Window size will be N. Output tick will be generated when tick “enter” window (arrival event) and when “exit” window (exit event)

      • bucket_interval = 0

        Left boundary of window will be bound to start time. For each tick aggregation will be calculated in [start_time; tick_t].

    • running = False

      buckets partition the [query start time, query end time) interval into non-overlapping intervals of size bucket_interval (with the last interval possibly of a smaller size). If bucket_interval is set to 0 a single bucket for the entire interval is created.

    Default: False - create totally independent buckets. Number of buckets = (end - start) / bucket_interval’)

  • all_fields (Literal[True, False, 'first', 'last', 'high', 'low']) –

    • If all_fields False - output tick will have only aggregation fields.

    • If all_fields True - also include all fields from the input ticks by using values from first_tick in bucket.

    • If all_fields set to “first”, “last”, “high”, or “low” - explicitly set tick selection policy for all fields values.

  • bucket_interval (int) – Determines the length of each bucket (units depends on bucket_units).

  • bucket_time (Literal['start', 'end']) –

    Control output timestamp.

    • start

      the timestamp assigned to the bucket is the start time of the bucket.

    • end

      the timestamp assigned to the bucket is the end time of the bucket.

  • bucket_units (Literal['seconds', 'ticks', 'days', 'months', 'flexible']) –

    Set bucket interval units.

    If set to flexible bucket_end_criteria must be set.

  • bucket_end_condition (condition) – An expression that is evaluated on every tick. If it evaluates to “True”, then a new bucket is created. This parameter is only used if bucket_units is set to “flexible”

  • end_condition_per_group (bool) –

    Controls application of bucket_end_condition in groups.

    • end_condition_per_group = True

      bucket_end_condition is applied only to the group defined by group_by

    • end_condition_per_group = False

      bucket_end_condition applied across all groups

    This parameter is only used if bucket_units is set to “flexible”

  • boundary_tick_bucket (Literal['new', 'previous']) –

    Controls boundary tick ownership.

    • previous

      A tick on which bucket_end_condition evaluates to “true” belongs to the bucket being closed.

    • new

      tick belongs to the new bucket.

    This parameter is only used if bucket_units is set to “flexible”

  • group_by (list, str or expression) – When specified, each bucket is broken further into additional sub-buckets based on specified field values.

Return type

onetick.py.core.source.Source

Examples

By default the whole data is aggregated:

>>> data = otp.Ticks(X=[1, 2, 3, 4])
>>> data = data.agg({'X_SUM': otp.agg.sum('X')})
>>> otp.run(data)
        Time  X_SUM
0 2003-12-04     10

Multiple aggregations can be applied at the same time:

>>> data = otp.Ticks(X=[1, 2, 3, 4])
>>> data = data.agg({'X_SUM': otp.agg.sum('X'),
...                  'X_MEAN': otp.agg.average('X')})
>>> otp.run(data)
        Time  X_SUM  X_MEAN
0 2003-12-04     10     2.5

Aggregation can be used in running mode:

>>> data = otp.Ticks(X=[1, 2, 3, 4])
>>> data = data.agg({'CUM_SUM': otp.agg.sum('X')}, running=True)
>>> otp.run(data)
                     Time  CUM_SUM
0 2003-12-01 00:00:00.000        1
1 2003-12-01 00:00:00.001        3
2 2003-12-01 00:00:00.002        6
3 2003-12-01 00:00:00.003       10

Aggregation can be split in buckets:

>>> data = otp.Ticks(X=[1, 2, 3, 4])
>>> data = data.agg({'X_SUM': otp.agg.sum('X')}, bucket_interval=2, bucket_units='ticks')
>>> otp.run(data)
                     Time  X_SUM
0 2003-12-01 00:00:00.001      3
1 2003-12-01 00:00:00.003      7

Running aggregation can be used with buckets too:

>>> data = otp.Ticks(X=[1, 2, 3, 4], offsets=[0, 1000, 1500, 3000])
>>> data = data.agg(dict(X_MEAN=otp.agg.average("X"),
...                      X_STD=otp.agg.stddev("X")),
...                 running=True, bucket_interval=2)
>>> otp.run(data)
                     Time  X_MEAN     X_STD
0 2003-12-01 00:00:00.000     1.0  0.000000
1 2003-12-01 00:00:00.001     1.5  0.500000
2 2003-12-01 00:00:00.002     2.0  0.816497
3 2003-12-01 00:00:00.003     2.5  1.118034
4 2003-12-01 00:00:02.000     3.0  0.816497
5 2003-12-01 00:00:02.001     3.5  0.500000
6 2003-12-01 00:00:02.002     4.0  0.000000
7 2003-12-01 00:00:02.003     NaN       NaN

See also

COMPUTE OneTick event processor