otp.join_with_aggregated_window#

join_with_aggregated_window(agg_src, pass_src, aggregation, boundary_aggr_tick='next', pass_src_delay_msec=0, bucket_interval=0, bucket_units='seconds', output_type_index=None)#

Computes one or more aggregations on agg_src time series and joins the result with each incoming tick from pass_src time series.

Parameters:
  • agg_src (onetick.py.Source) – Input time series to which aggregation will be applied.

  • pass_src (onetick.py.Source) – Input time series that will be joined with the aggregation result.

  • aggregation (dict) – Dictionary with aggregation output field names and aggregation objects, similar to the one passed to onetick.py.Source.agg() method.

  • pass_src_delay_msec (int) –

    Specifies by how much any incoming tick from the pass_src is delayed.

    The effective timestamp of a tick from the pass_src with timestamp T is T - pass_src_delay_msec. This parameter may be negative, in which case ticks from pass_src will be joined with the aggregation result of a later timestamp.

  • boundary_aggr_tick (str) –

    Controls the logic of joining ticks with the same timestamp.

    If set to next, ticks from agg_src with the same timestamp (+ pass_src_delay_msec) as the latest ticks from pass_src will not be included in that tick’s joined aggregation.

  • bucket_interval (int) –

    Determines the length of each bucket (units depends on bucket_units).

    When this parameter is set to 0 (by default), the computation of the aggregation is performed for all ticks starting from the query’s start time and until pass_src effective tick timestamp T - pass_src_delay_timestamp, regardless of the value of bucket_units.

  • bucket_units ('seconds', 'ticks', 'days', 'months') – Set bucket interval units.

  • output_type_index (int) – Specifies index of source between agg_src and pass_src from which type and properties of output object will be taken. Useful when merging sources that inherited from Source. By default, output object type will be Source.

Return type:

onetick.py.Source

Examples

>>> agg_src = otp.Ticks(A=[0, 1, 2, 3, 4, 5, 6])
>>> pass_src = otp.Ticks(B=[1, 3, 5], offset=[1, 3, 5])
>>> otp.run(agg_src)
                     Time  A
0 2003-12-01 00:00:00.000  0
1 2003-12-01 00:00:00.001  1
2 2003-12-01 00:00:00.002  2
3 2003-12-01 00:00:00.003  3
4 2003-12-01 00:00:00.004  4
5 2003-12-01 00:00:00.005  5
6 2003-12-01 00:00:00.006  6
>>> otp.run(pass_src)
                     Time  B
0 2003-12-01 00:00:00.001  1
1 2003-12-01 00:00:00.003  3
2 2003-12-01 00:00:00.005  5

By default the aggregation is applied to the ticks from agg_src in the bucket from query start time until (but not including) the effective timestamp of the tick from pass_src:

agg_src = otp.Ticks(A=[0, 1, 2, 3, 4, 5, 6])
pass_src = otp.Ticks(B=[1, 3, 5], offset=[1, 3, 5])
data = otp.join_with_aggregated_window(
    agg_src, pass_src, {
        'SUM': otp.agg.sum('A'),
        'COUNT': otp.agg.count(),
    }
)
df = otp.run(data)
print(df)
                     Time  SUM  COUNT  B
0 2003-12-01 00:00:00.001    0      1  1
1 2003-12-01 00:00:00.003    3      3  3
2 2003-12-01 00:00:00.005   10      5  5

If you want ticks from agg_src with timestamp equal to effective timestamp of tick from pass_src to be included in bucket, you can set boundary_aggr_tick to previous:

agg_src = otp.Ticks(A=[0, 1, 2, 3, 4, 5, 6])
pass_src = otp.Ticks(B=[1, 3, 5], offset=[1, 3, 5])
data = otp.join_with_aggregated_window(
    agg_src, pass_src, {
        'SUM': otp.agg.sum('A'),
        'COUNT': otp.agg.count(),
    },
    boundary_aggr_tick='previous',
)
df = otp.run(data)
print(df)
                     Time  SUM  COUNT  B
0 2003-12-01 00:00:00.001    1      2  1
1 2003-12-01 00:00:00.003    6      4  3
2 2003-12-01 00:00:00.005   15      6  5

Set parameters bucket_interval and bucket_units to control the size of the aggregation bucket. For example, to aggregate buckets of two ticks:

agg_src = otp.Ticks(A=[0, 1, 2, 3, 4, 5, 6])
pass_src = otp.Ticks(B=[1, 3, 5], offset=[1, 3, 5])
data = otp.join_with_aggregated_window(
    agg_src, pass_src, {
        'SUM': otp.agg.sum('A'),
        'COUNT': otp.agg.count(),
    },
    boundary_aggr_tick='previous',
    bucket_interval=2,
    bucket_units='ticks',
)
df = otp.run(data)
print(df)
                     Time  SUM  COUNT  B
0 2003-12-01 00:00:00.001    1      2  1
1 2003-12-01 00:00:00.003    5      2  3
2 2003-12-01 00:00:00.005    9      2  5

By default the effective timestamp of the tick from pass_src is the same as original. It can be changed with parameter pass_src_delay_msec. The effective timestamp of the tick is calculated with T - pass_src_delay_msec, and parameter pass_src_delay_msec can be negative too. This allows to shift bucket end boundary like this:

agg_src = otp.Ticks(A=[0, 1, 2, 3, 4, 5, 6])
pass_src = otp.Ticks(B=[1, 3, 5], offset=[1, 3, 5])
data = otp.join_with_aggregated_window(
    agg_src, pass_src, {
        'SUM': otp.agg.sum('A'),
        'COUNT': otp.agg.count(),
    },
    boundary_aggr_tick='previous',
    pass_src_delay_msec=-1,
)
df = otp.run(data)
print(df)
                     Time  SUM  COUNT  B
0 2003-12-01 00:00:00.001    3      3  1
1 2003-12-01 00:00:00.003   10      5  3
2 2003-12-01 00:00:00.005   21      7  5

Use parameter output_type_index to specify which input class to use to create output object. It may be useful in case some custom user class was used as input:

class CustomTick(otp.Tick):
    def custom_method(self):
        return 'custom_result'
data1 = otp.Tick(A=1)
data2 = CustomTick(B=2)
data = otp.join_with_aggregated_window(
    data1, data2, {'A': otp.agg.count()},
    boundary_aggr_tick='previous',
    output_type_index=1,
)
print(type(data))
print(repr(data.custom_method()))
print(otp.run(data))
<class 'onetick.py.functions.CustomTick'>
'custom_result'
        Time  A  B
0 2003-12-01  1  2

Use-case: check the volume in the 60 seconds following this trade (not including this trade):

>>> data = otp.DataSource('US_COMP', tick_type='TRD', symbols='MSFT', date=otp.dt(2022, 3, 3))
>>> otp.run(data)
                     Time  PRICE  SIZE
0 2022-03-03 00:00:00.000    1.0   100
1 2022-03-03 00:00:00.001    1.1   101
2 2022-03-03 00:00:00.002    1.2   102
3 2022-03-03 00:01:00.000    2.0   200
4 2022-03-03 00:01:00.001    2.1   201
5 2022-03-03 00:01:00.002    2.2   202
data = otp.DataSource('US_COMP', tick_type='TRD', symbols='MSFT', date=otp.dt(2022, 3, 3))
data = otp.join_with_aggregated_window(
    data, data, {'VOLUME': otp.agg.sum('SIZE')},
    boundary_aggr_tick='next',
    pass_src_delay_msec=-60000,
    bucket_interval=60, bucket_units='seconds',
)
df = otp.run(data)
print(df)
                     Time  VOLUME  PRICE  SIZE
0 2022-03-03 00:00:00.000     203    1.0   100
1 2022-03-03 00:00:00.001     302    1.1   101
2 2022-03-03 00:00:00.002     401    1.2   102
3 2022-03-03 00:01:00.000     403    2.0   200
4 2022-03-03 00:01:00.001     202    2.1   201
5 2022-03-03 00:01:00.002       0    2.2   202

See also

JOIN_WITH_AGGREGATED_WINDOW OneTick event processor