Aggregating#

Let’s start with an unaggregated time series.

import onetick.py as otp

q = otp.DataSource('NYSE_TAQ', tick_type='TRD')
q = q[['PRICE', 'SIZE', 'COND', 'EXCHANGE']]
otp.run(q, start=otp.dt(2023, 3, 29, 9, 30), end=otp.dt(2023, 3, 29, 16), symbols=['SPY'])
Time PRICE SIZE COND EXCHANGE
0 2023-03-29 09:30:00.000877568 399.920 400 T P
1 2023-03-29 09:30:00.001151232 399.920 1000 T T
2 2023-03-29 09:30:00.001154304 399.920 1000 T T
3 2023-03-29 09:30:00.001921280 399.930 657 T T
4 2023-03-29 09:30:00.010831360 399.925 100 F Z
... ... ... ... ... ...
537609 2023-03-29 15:59:59.994555136 401.350 643 F P
537610 2023-03-29 15:59:59.995045376 401.350 900 F P
537611 2023-03-29 15:59:59.997313024 401.340 100 Z
537612 2023-03-29 15:59:59.997354752 401.340 498 N
537613 2023-03-29 15:59:59.997406208 401.340 200 T

537614 rows × 5 columns

Let’s make a note of the total number of trades.

We can aggregate over the entire queried interval.

q = otp.DataSource('NYSE_TAQ', tick_type='TRD')
q = q[['PRICE', 'SIZE', 'COND', 'EXCHANGE']]
q = q.agg({
    'volume': otp.agg.sum('SIZE'),
    'vwap': otp.agg.vwap('PRICE','SIZE')
})
otp.run(q, start=otp.dt(2023, 3, 29, 9, 30), end=otp.dt(2023, 3, 29, 16), symbols=['SPY'])
Time volume vwap
0 2023-03-29 16:00:00 68233701 399.760148

Or over fixed buckets (aka bars or windows).

q = otp.DataSource('NYSE_TAQ', tick_type='TRD')
q = q[['PRICE', 'SIZE', 'COND', 'EXCHANGE']]
q = q.agg({
    'volume': otp.agg.sum('SIZE'),
    'vwap': otp.agg.vwap('PRICE','SIZE')
}, bucket_interval=1800)
otp.run(q, start=otp.dt(2023, 3, 29, 9, 30), end=otp.dt(2023, 3, 29, 16), symbols=['SPY'])
Time volume vwap
0 2023-03-29 10:00:00 11346391 398.805607
1 2023-03-29 10:30:00 5344474 399.544761
2 2023-03-29 11:00:00 5139562 399.146746
3 2023-03-29 11:30:00 2827218 399.166233
4 2023-03-29 12:00:00 3198533 399.313014
5 2023-03-29 12:30:00 4026223 399.077673
6 2023-03-29 13:00:00 2245602 399.872538
7 2023-03-29 13:30:00 2558837 400.080857
8 2023-03-29 14:00:00 3234347 399.745620
9 2023-03-29 14:30:00 3290063 399.376632
10 2023-03-29 15:00:00 3301416 400.263786
11 2023-03-29 15:30:00 7402762 400.131562
12 2023-03-29 16:00:00 14318273 400.934522

Or over a sliding window.

q = otp.DataSource('NYSE_TAQ', tick_type='TRD')
q = q[['PRICE', 'SIZE', 'COND', 'EXCHANGE']]
q = q.agg({
    'volume': otp.agg.sum('SIZE'),
    'vwap': otp.agg.vwap('PRICE','SIZE')
}, bucket_interval=1800, running=True)
otp.run(q, start=otp.dt(2023, 3, 29, 9, 30), end=otp.dt(2023, 3, 29, 16), symbols=['SPY'])
Time volume vwap
0 2023-03-29 09:30:00.000877568 400 399.920000
1 2023-03-29 09:30:00.001151232 1400 399.920000
2 2023-03-29 09:30:00.001154304 2400 399.920000
3 2023-03-29 09:30:00.001921280 3057 399.922149
4 2023-03-29 09:30:00.010831360 3157 399.922239
... ... ... ...
946090 2023-03-29 15:59:59.994555136 14316575 400.934474
946091 2023-03-29 15:59:59.995045376 14317475 400.934500
946092 2023-03-29 15:59:59.997313024 14317575 400.934502
946093 2023-03-29 15:59:59.997354752 14318073 400.934517
946094 2023-03-29 15:59:59.997406208 14318273 400.934522

946095 rows × 3 columns

Note that the number of output ticks is more than the number of trades. This is due to the output tick being created not only when each input tick enters the window but also when it drops out.

We can display all fields of the incoming tick along with the current values of the sliding window metrics.

q = otp.DataSource('NYSE_TAQ', tick_type='TRD')
q = q[['PRICE', 'SIZE', 'COND', 'EXCHANGE']]
q = q.agg({
    'volume': otp.agg.sum('SIZE'),
    'vwap': otp.agg.vwap('PRICE','SIZE')
}, bucket_interval=1800, running=True, all_fields=True)
otp.run(q, start=otp.dt(2023, 3, 29, 9, 30), end=otp.dt(2023, 3, 29, 16), symbols=['SPY'])
Time PRICE SIZE COND EXCHANGE volume vwap
0 2023-03-29 09:30:00.000877568 399.920 400 T P 400 399.920000
1 2023-03-29 09:30:00.001151232 399.920 1000 T T 1400 399.920000
2 2023-03-29 09:30:00.001154304 399.920 1000 T T 2400 399.920000
3 2023-03-29 09:30:00.001921280 399.930 657 T T 3057 399.922149
4 2023-03-29 09:30:00.010831360 399.925 100 F Z 3157 399.922239
... ... ... ... ... ... ... ...
537609 2023-03-29 15:59:59.994555136 401.350 643 F P 14316575 400.934474
537610 2023-03-29 15:59:59.995045376 401.350 900 F P 14317475 400.934500
537611 2023-03-29 15:59:59.997313024 401.340 100 Z 14317575 400.934502
537612 2023-03-29 15:59:59.997354752 401.340 498 N 14318073 400.934517
537613 2023-03-29 15:59:59.997406208 401.340 200 T 14318273 400.934522

537614 rows × 7 columns

In this case, we are back to the same number of ticks as the number trades as an output tick is only created on arrival of an input tick.

All of the aggregation operations support grouping.

q = otp.DataSource('NYSE_TAQ', tick_type='TRD')
q = q[['PRICE', 'SIZE', 'COND', 'EXCHANGE']]
q = q.agg({
    'volume': otp.agg.sum('SIZE'),
    'vwap': otp.agg.vwap('PRICE','SIZE')
}, group_by=['EXCHANGE'])
otp.run(q, start=otp.dt(2023, 3, 29, 9, 30), end=otp.dt(2023, 3, 29, 16), symbols=['SPY'])
Time EXCHANGE volume vwap
0 2023-03-29 16:00:00 A 253162 400.322973
1 2023-03-29 16:00:00 B 516793 400.289029
2 2023-03-29 16:00:00 C 154697 400.230680
3 2023-03-29 16:00:00 D 24495779 398.999133
4 2023-03-29 16:00:00 H 1104733 400.336441
5 2023-03-29 16:00:00 J 598534 400.315377
6 2023-03-29 16:00:00 K 3899343 400.118436
7 2023-03-29 16:00:00 L 1948 401.266828
8 2023-03-29 16:00:00 M 1268484 400.170423
9 2023-03-29 16:00:00 N 2010830 400.202291
10 2023-03-29 16:00:00 P 13043077 400.191965
11 2023-03-29 16:00:00 T 10210762 400.208476
12 2023-03-29 16:00:00 U 1780588 400.185296
13 2023-03-29 16:00:00 V 1495374 400.174316
14 2023-03-29 16:00:00 X 597341 400.145003
15 2023-03-29 16:00:00 Y 1680262 400.025433
16 2023-03-29 16:00:00 Z 5121994 400.172607

A list of all aggregations appears here. It can also be retrieved with dir(otp.agg).

Use case: Creating Bars#

The code below creates minute bars (bucket_interval=60 seconds).

trd = otp.DataSource('NYSE_TAQ', tick_type='TRD')
trd, _ = trd[trd['COND'].str.match('^[^O6TUHILNRWZ47QMBCGPV]*$')]

bars = trd.agg({
    'OPEN': otp.agg.first('PRICE'),
    'HIGH': otp.agg.max('PRICE'),
    'LOW': otp.agg.min('PRICE'),
    'CLOSE': otp.agg.last('PRICE'),
    'VOLUME': otp.agg.sum('SIZE'),
}, bucket_interval=60)

otp.run(bars, start=otp.dt(2022, 11, 28, 9, 30), end=otp.dt(2022, 11, 30, 16), symbols=['SPY'], apply_times_daily=True)
Time OPEN HIGH LOW CLOSE VOLUME
0 2022-11-28 09:31:00 399.09 399.750 399.000 399.6500 289791
1 2022-11-28 09:32:00 399.69 400.000 399.600 399.8900 295441
2 2022-11-28 09:33:00 399.89 400.175 399.840 400.1750 233032
3 2022-11-28 09:34:00 400.16 400.380 400.000 400.1600 208954
4 2022-11-28 09:35:00 400.13 400.360 400.120 400.3493 112015
... ... ... ... ... ... ...
1165 2022-11-30 15:56:00 406.68 406.830 406.460 406.8100 1311316
1166 2022-11-30 15:57:00 406.80 406.830 406.525 406.6000 1319798
1167 2022-11-30 15:58:00 406.60 406.850 406.550 406.7300 1691348
1168 2022-11-30 15:59:00 406.73 406.950 406.680 406.7100 2592503
1169 2022-11-30 16:00:00 406.71 407.550 406.680 407.4700 6700757

1170 rows × 6 columns

Note that we have minute bars precomputed.

bars = otp.DataSource('NYSE_TAQ_BARS', tick_type='TRD_1M')
bars = bars[['FIRST', 'HIGH', 'LOW', 'LAST', 'VOLUME']]
otp.run(bars, start=otp.dt(2022, 11, 28, 9, 31), end=otp.dt(2022, 11, 30, 16, 1), symbols=['SPY'], apply_times_daily=True)
Time FIRST HIGH LOW LAST VOLUME
0 2022-11-28 09:31:00 399.09 399.750 399.000 399.6500 289791
1 2022-11-28 09:32:00 399.69 400.000 399.600 399.8900 295441
2 2022-11-28 09:33:00 399.89 400.175 399.840 400.1750 233032
3 2022-11-28 09:34:00 400.16 400.380 400.000 400.1600 208954
4 2022-11-28 09:35:00 400.13 400.360 400.120 400.3493 112015
... ... ... ... ... ... ...
1165 2022-11-30 15:56:00 406.68 406.830 406.460 406.8100 1311316
1166 2022-11-30 15:57:00 406.80 406.830 406.525 406.6000 1319798
1167 2022-11-30 15:58:00 406.60 406.850 406.550 406.7300 1691348
1168 2022-11-30 15:59:00 406.73 406.950 406.680 406.7100 2592503
1169 2022-11-30 16:00:00 406.71 407.550 406.680 407.4700 6700757

1170 rows × 6 columns