otp.join_with_aggregated_window#
- join_with_aggregated_window(agg_src, pass_src, aggregation, boundary_aggr_tick='next', pass_src_delay_msec=0, bucket_interval=0, bucket_units='seconds', output_type_index=None)#
Computes one or more aggregations on
agg_src
time series and joins the result with each incoming tick frompass_src
time series.- Parameters
agg_src (
onetick.py.Source
) – Input time series to which aggregation will be applied.pass_src (
onetick.py.Source
) – Input time series that will be joined with the aggregation result.aggregation (dict) – Dictionary with aggregation output field names and aggregation objects, similar to the one passed to
onetick.py.Source.agg()
method.pass_src_delay_msec (int) –
Specifies by how much any incoming tick from the
pass_src
is delayed.The effective timestamp of a tick from the
pass_src
with timestampT
isT - pass_src_delay_msec
. This parameter may be negative, in which case ticks frompass_src
will be joined with the aggregation result of a later timestamp.boundary_aggr_tick (str) –
Controls the logic of joining ticks with the same timestamp.
If set to next, ticks from
agg_src
with the same timestamp (+pass_src_delay_msec
) as the latest ticks frompass_src
will not be included in that tick’s joined aggregation.bucket_interval (int) –
Determines the length of each bucket (units depends on
bucket_units
).When this parameter is set to 0 (by default), the computation of the aggregation is performed for all ticks starting from the query’s start time and until
pass_src
effective tick timestampT - pass_src_delay_timestamp
, regardless of the value ofbucket_units
.bucket_units ('seconds', 'ticks', 'days', 'months') – Set bucket interval units.
output_type_index (int) – Specifies index of source between
agg_src
andpass_src
from which type and properties of output object will be taken. Useful when merging sources that inherited fromSource
. By default, output object type will beSource
.
- Return type
Examples
>>> agg_src = otp.Ticks(A=[0, 1, 2, 3, 4, 5, 6]) >>> pass_src = otp.Ticks(B=[1, 3, 5], offset=[1, 3, 5]) >>> otp.run(agg_src) Time A 0 2003-12-01 00:00:00.000 0 1 2003-12-01 00:00:00.001 1 2 2003-12-01 00:00:00.002 2 3 2003-12-01 00:00:00.003 3 4 2003-12-01 00:00:00.004 4 5 2003-12-01 00:00:00.005 5 6 2003-12-01 00:00:00.006 6 >>> otp.run(pass_src) Time B 0 2003-12-01 00:00:00.001 1 1 2003-12-01 00:00:00.003 3 2 2003-12-01 00:00:00.005 5
By default the aggregation is applied to the ticks from
agg_src
in the bucket from query start time until (but not including) the effective timestamp of the tick frompass_src
:agg_src = otp.Ticks(A=[0, 1, 2, 3, 4, 5, 6]) pass_src = otp.Ticks(B=[1, 3, 5], offset=[1, 3, 5]) data = otp.join_with_aggregated_window( agg_src, pass_src, { 'SUM': otp.agg.sum('A'), 'COUNT': otp.agg.count(), } ) df = otp.run(data) print(df)
Time SUM COUNT B 0 2003-12-01 00:00:00.001 0 1 1 1 2003-12-01 00:00:00.003 3 3 3 2 2003-12-01 00:00:00.005 10 5 5
If you want ticks from
agg_src
with timestamp equal to effective timestamp of tick frompass_src
to be included in bucket, you can setboundary_aggr_tick
toprevious
:agg_src = otp.Ticks(A=[0, 1, 2, 3, 4, 5, 6]) pass_src = otp.Ticks(B=[1, 3, 5], offset=[1, 3, 5]) data = otp.join_with_aggregated_window( agg_src, pass_src, { 'SUM': otp.agg.sum('A'), 'COUNT': otp.agg.count(), }, boundary_aggr_tick='previous', ) df = otp.run(data) print(df)
Time SUM COUNT B 0 2003-12-01 00:00:00.001 1 2 1 1 2003-12-01 00:00:00.003 6 4 3 2 2003-12-01 00:00:00.005 15 6 5
Set parameters
bucket_interval
andbucket_units
to control the size of the aggregation bucket. For example, to aggregate buckets of two ticks:agg_src = otp.Ticks(A=[0, 1, 2, 3, 4, 5, 6]) pass_src = otp.Ticks(B=[1, 3, 5], offset=[1, 3, 5]) data = otp.join_with_aggregated_window( agg_src, pass_src, { 'SUM': otp.agg.sum('A'), 'COUNT': otp.agg.count(), }, boundary_aggr_tick='previous', bucket_interval=2, bucket_units='ticks', ) df = otp.run(data) print(df)
Time SUM COUNT B 0 2003-12-01 00:00:00.001 1 2 1 1 2003-12-01 00:00:00.003 5 2 3 2 2003-12-01 00:00:00.005 9 2 5
By default the effective timestamp of the tick from
pass_src
is the same as original. It can be changed with parameterpass_src_delay_msec
. The effective timestamp of the tick is calculated withT - pass_src_delay_msec
, and parameterpass_src_delay_msec
can be negative too. This allows to shift bucket end boundary like this:agg_src = otp.Ticks(A=[0, 1, 2, 3, 4, 5, 6]) pass_src = otp.Ticks(B=[1, 3, 5], offset=[1, 3, 5]) data = otp.join_with_aggregated_window( agg_src, pass_src, { 'SUM': otp.agg.sum('A'), 'COUNT': otp.agg.count(), }, boundary_aggr_tick='previous', pass_src_delay_msec=-1, ) df = otp.run(data) print(df)
Time SUM COUNT B 0 2003-12-01 00:00:00.001 3 3 1 1 2003-12-01 00:00:00.003 10 5 3 2 2003-12-01 00:00:00.005 21 7 5
Use parameter
output_type_index
to specify which input class to use to create output object. It may be useful in case some custom user class was used as input:class CustomTick(otp.Tick): def custom_method(self): return 'custom_result' data1 = otp.Tick(A=1) data2 = CustomTick(B=2) data = otp.join_with_aggregated_window( data1, data2, {'A': otp.agg.count()}, boundary_aggr_tick='previous', output_type_index=1, ) print(type(data)) print(repr(data.custom_method())) print(otp.run(data))
<class 'onetick.py.functions.CustomTick'> 'custom_result' Time A B 0 2003-12-01 1 2
Use-case: check the volume in the 60 seconds following this trade (not including this trade):
>>> data = otp.DataSource('NYSE_TAQ', tick_type='TRD', symbols='MSFT', date=otp.dt(2022, 3, 3)) >>> otp.run(data) Time PRICE SIZE 0 2022-03-03 00:00:00.000 1.0 100 1 2022-03-03 00:00:00.001 1.1 101 2 2022-03-03 00:00:00.002 1.2 102 3 2022-03-03 00:01:00.000 2.0 200 4 2022-03-03 00:01:00.001 2.1 201 5 2022-03-03 00:01:00.002 2.2 202
data = otp.DataSource('NYSE_TAQ', tick_type='TRD', symbols='MSFT', date=otp.dt(2022, 3, 3)) data = otp.join_with_aggregated_window( data, data, {'VOLUME': otp.agg.sum('SIZE')}, boundary_aggr_tick='next', pass_src_delay_msec=-60000, bucket_interval=60, bucket_units='seconds', ) df = otp.run(data) print(df)
Time VOLUME PRICE SIZE 0 2022-03-03 00:00:00.000 203 1.0 100 1 2022-03-03 00:00:00.001 302 1.1 101 2 2022-03-03 00:00:00.002 401 1.2 102 3 2022-03-03 00:01:00.000 403 2.0 200 4 2022-03-03 00:01:00.001 202 2.1 201 5 2022-03-03 00:01:00.002 0 2.2 202
See also
JOIN_WITH_AGGREGATED_WINDOW OneTick event processor