otp.Source.agg#

Source.agg(aggs, running=False, all_fields=False, bucket_interval=0, bucket_time='end', bucket_units='seconds', bucket_end_condition=None, end_condition_per_group=False, boundary_tick_bucket='new', group_by=None)[source]#

Applies composition of otp.agg aggregations

Parameters
  • aggs (dict of aggregations) – aggregation dict: key - output column name; value - aggregation

  • running (bool) –

    Aggregation will be calculated as sliding window. running and bucket_interval parameters determines when new buckets are created.

    • running = True

      aggregation will be calculated in a sliding window.

      • bucket_interval = N (N > 0)

        Window size will be N. Output tick will be generated when tick “enter” window (arrival event) and when “exit” window (exit event)

      • bucket_interval = 0

        Left boundary of window will be bound to start time. For each tick aggregation will be calculated in [start_time; tick_t].

    • running = False

      buckets partition the [query start time, query end time) interval into non-overlapping intervals of size bucket_interval (with the last interval possibly of a smaller size). If bucket_interval is set to 0 a single bucket for the entire interval is created.

      Note that in non-running mode OneTick unconditionally divides the whole time interval into specified number of buckets. It means that you will always get this specified number of ticks in the result, even if you have less ticks in the input data.

    Default: False

  • all_fields (Literal[True, False, 'first', 'last', 'high', 'low']) –

    • If all_fields False - output tick will have only aggregation fields.

    • If all_fields False and running True - output ticks are created when a tick enters or leaves the

    sliding window.

    • If all_fields True - an output tick is generated only for arrival events, but all attributes from the input

    tick causing an arrival event are copied over to the output tick and the aggregation is added as another attribute.

    • If all_fields set to “first”, “last”, “high”, or “low” - explicitly set tick selection policy for all fields values. For “high” and “low” “PRICE” field will be selected as an input. Otherwise, you will get the runtime error. If all_fields is set to one of these values, running can’t be True.

  • bucket_interval (int) – Determines the length of each bucket (units depends on bucket_units).

  • bucket_time (Literal['start', 'end']) –

    Control output timestamp.

    • start

      the timestamp assigned to the bucket is the start time of the bucket.

    • end

      the timestamp assigned to the bucket is the end time of the bucket.

  • bucket_units (Literal['seconds', 'ticks', 'days', 'months', 'flexible']) –

    Set bucket interval units.

    If set to flexible then bucket_end_criteria must be set.

    Note that seconds bucket unit doesn’t take into account daylight-saving time of the timezone, so you may not get expected results when using, for example, 24 * 60 * 60 seconds as bucket interval. In such case use days bucket unit instead. See example in onetick.py.agg.sum().

  • bucket_end_condition (condition) – An expression that is evaluated on every tick. If it evaluates to “True”, then a new bucket is created. This parameter is only used if bucket_units is set to “flexible”

  • end_condition_per_group (bool) –

    Controls application of bucket_end_condition in groups.

    • end_condition_per_group = True

      bucket_end_condition is applied only to the group defined by group_by

    • end_condition_per_group = False

      bucket_end_condition applied across all groups

    This parameter is only used if bucket_units is set to “flexible”

  • boundary_tick_bucket (Literal['new', 'previous']) –

    Controls boundary tick ownership.

    • previous

      A tick on which bucket_end_condition evaluates to “true” belongs to the bucket being closed.

    • new

      tick belongs to the new bucket.

    This parameter is only used if bucket_units is set to “flexible”

  • group_by (list, str or expression) – When specified, each bucket is broken further into additional sub-buckets based on specified field values. If Operation is used then GROUP_{i} column is added. Where i is index in group_by list. For example, if Operation is the only element in group_by list then GROUP_0 field will be added.

Return type

onetick.py.core.source.Source

Examples

By default the whole data is aggregated:

>>> data = otp.Ticks(X=[1, 2, 3, 4])
>>> data = data.agg({'X_SUM': otp.agg.sum('X')})
>>> otp.run(data)
        Time  X_SUM
0 2003-12-04     10

Multiple aggregations can be applied at the same time:

>>> data = otp.Ticks(X=[1, 2, 3, 4])
>>> data = data.agg({'X_SUM': otp.agg.sum('X'),
...                  'X_MEAN': otp.agg.average('X')})
>>> otp.run(data)
        Time  X_SUM  X_MEAN
0 2003-12-04     10     2.5

Aggregation can be used in running mode:

>>> data = otp.Ticks(X=[1, 2, 3, 4])
>>> data = data.agg({'CUM_SUM': otp.agg.sum('X')}, running=True)
>>> otp.run(data)
                     Time  CUM_SUM
0 2003-12-01 00:00:00.000        1
1 2003-12-01 00:00:00.001        3
2 2003-12-01 00:00:00.002        6
3 2003-12-01 00:00:00.003       10

Aggregation can be split in buckets:

>>> data = otp.Ticks(X=[1, 2, 3, 4])
>>> data = data.agg({'X_SUM': otp.agg.sum('X')}, bucket_interval=2, bucket_units='ticks')
>>> otp.run(data)
                     Time  X_SUM
0 2003-12-01 00:00:00.001      3
1 2003-12-01 00:00:00.003      7

Running aggregation can be used with buckets too. In this case (all_fields=False and running=True) output ticks are created when a tick enters or leaves the sliding window (that’s why for this example there are 8 output ticks for 4 input ticks):

>>> data = otp.Ticks(X=[1, 2, 3, 4], offset=[0, 1000, 1500, 3600])
>>> data = data.agg(dict(X_MEAN=otp.agg.average("X"),
...                      X_STD=otp.agg.stddev("X")),
...                 running=True, bucket_interval=2)
>>> otp.run(data)
                     Time  X_MEAN     X_STD
0 2003-12-01 00:00:00.000     1.0  0.000000
1 2003-12-01 00:00:01.000     1.5  0.500000
2 2003-12-01 00:00:01.500     2.0  0.816497
3 2003-12-01 00:00:02.000     2.5  0.500000
4 2003-12-01 00:00:03.000     3.0  0.000000
5 2003-12-01 00:00:03.500     NaN       NaN
6 2003-12-01 00:00:03.600     4.0  0.000000
7 2003-12-01 00:00:05.600     NaN       NaN

If all_fields=True an output tick is generated only for arrival events, but all attributes from the input tick causing an arrival event are copied over to the output tick and the aggregation is added as another attribute:

>>> data = otp.Ticks(X=[1, 2, 3, 4], offset=[0, 1000, 1500, 3600])
>>> data = data.agg(dict(X_MEAN=otp.agg.average("X"),
...                      X_STD=otp.agg.stddev("X")),
...                 all_fields=True, running=True)
>>> otp.run(data)
                     Time  X  X_MEAN     X_STD
0 2003-12-01 00:00:00.000  1     1.0  0.000000
1 2003-12-01 00:00:01.000  2     1.5  0.500000
2 2003-12-01 00:00:01.500  3     2.0  0.816497
3 2003-12-01 00:00:03.600  4     2.5  1.118034

See also

COMPUTE OneTick event processor