Aggregating#

Let’s start with an unaggregated time series.

import onetick.py as otp

s = otp.dt(2023, 5, 15, 9, 30)
e = otp.dt(2023, 5, 15, 9, 30, 1)

q = otp.DataSource('NYSE_TAQ', tick_type='TRD')
q = q[['PRICE', 'SIZE', 'COND', 'EXCHANGE']]
otp.run(q, start=s, end=e, symbols=['SPY'])
Time PRICE SIZE COND EXCHANGE
0 2023-05-15 09:30:00.000178688 412.22 100 T P
1 2023-05-15 09:30:00.000776704 412.22 247 Z
2 2023-05-15 09:30:00.003603456 412.22 100 T T
3 2023-05-15 09:30:00.006352128 412.24 1 I K
4 2023-05-15 09:30:00.007128064 412.24 3 I K
... ... ... ... ... ...
310 2023-05-15 09:30:00.934032640 412.27 160 T T
311 2023-05-15 09:30:00.975609344 412.24 2 I D
312 2023-05-15 09:30:00.980264448 412.27 1 I D
313 2023-05-15 09:30:00.985391616 412.28 100 T
314 2023-05-15 09:30:00.985394944 412.28 100 Q T

315 rows × 5 columns

Let’s make a note of the total number of trades.

Method agg can be used to aggregate data.

We can aggregate over the entire queried interval by default:

q = otp.DataSource('NYSE_TAQ', tick_type='TRD')
q = q[['PRICE', 'SIZE', 'COND', 'EXCHANGE']]
q = q.agg({
    'volume': otp.agg.sum('SIZE'),
    'vwap': otp.agg.vwap('PRICE', 'SIZE'),
    'count': otp.agg.count(),
})
otp.run(q, start=s, end=e, symbols=['SPY'])
Time volume vwap count
0 2023-05-15 09:30:01 324352 412.212012 315

Or over fixed buckets (aka bars or windows), for example 100 milliseconds buckets:

q = otp.DataSource('NYSE_TAQ', tick_type='TRD')
q = q[['PRICE', 'SIZE', 'COND', 'EXCHANGE']]
q = q.agg({
    'volume': otp.agg.sum('SIZE'),
    'vwap': otp.agg.vwap('PRICE', 'SIZE')
}, bucket_interval=.1)
otp.run(q, start=s, end=e, symbols=['SPY'])
Time volume vwap
0 2023-05-15 09:30:00.100 1498 412.235948
1 2023-05-15 09:30:00.200 7743 412.237833
2 2023-05-15 09:30:00.300 1815 412.243515
3 2023-05-15 09:30:00.400 307298 412.210324
4 2023-05-15 09:30:00.500 2205 412.250503
5 2023-05-15 09:30:00.600 416 412.258750
6 2023-05-15 09:30:00.700 276 412.244354
7 2023-05-15 09:30:00.800 1755 412.240875
8 2023-05-15 09:30:00.900 595 412.245782
9 2023-05-15 09:30:01.000 751 412.267643

Or over a sliding window:

q = otp.DataSource('NYSE_TAQ', tick_type='TRD')
q = q[['PRICE', 'SIZE', 'COND', 'EXCHANGE']]
q = q.agg({
    'volume': otp.agg.sum('SIZE'),
    'vwap': otp.agg.vwap('PRICE', 'SIZE')
}, bucket_interval=.1, running=True)
otp.run(q, start=s, end=e, symbols=['SPY'])
Time volume vwap
0 2023-05-15 09:30:00.000178688 100 412.220000
1 2023-05-15 09:30:00.000776704 347 412.220000
2 2023-05-15 09:30:00.003603456 447 412.220000
3 2023-05-15 09:30:00.006352128 448 412.220045
4 2023-05-15 09:30:00.007128064 451 412.220177
... ... ... ...
604 2023-05-15 09:30:00.980264448 553 412.263074
605 2023-05-15 09:30:00.982660864 552 412.263062
606 2023-05-15 09:30:00.985391616 652 412.265660
607 2023-05-15 09:30:00.985394944 752 412.267566
608 2023-05-15 09:30:00.993288704 751 412.267643

609 rows × 3 columns

Note that the number of output ticks is more than the number of trades. This is due to the output tick being created not only when each input tick enters the window but also when it drops out.

We can display all fields of the incoming tick along with the current values of the sliding window metrics.

q = otp.DataSource('NYSE_TAQ', tick_type='TRD')
q = q[['PRICE', 'SIZE', 'COND', 'EXCHANGE']]
q = q.agg({
    'volume': otp.agg.sum('SIZE'),
    'vwap': otp.agg.vwap('PRICE', 'SIZE')
}, bucket_interval=.1, running=True, all_fields=True)
otp.run(q, start=s, end=e, symbols=['SPY'])
Time PRICE SIZE COND EXCHANGE volume vwap
0 2023-05-15 09:30:00.000178688 412.22 100 T P 100 412.220000
1 2023-05-15 09:30:00.000776704 412.22 247 Z 347 412.220000
2 2023-05-15 09:30:00.003603456 412.22 100 T T 447 412.220000
3 2023-05-15 09:30:00.006352128 412.24 1 I K 448 412.220045
4 2023-05-15 09:30:00.007128064 412.24 3 I K 451 412.220177
... ... ... ... ... ... ... ...
310 2023-05-15 09:30:00.934032640 412.27 160 T T 801 412.255893
311 2023-05-15 09:30:00.975609344 412.24 2 I D 552 412.263062
312 2023-05-15 09:30:00.980264448 412.27 1 I D 553 412.263074
313 2023-05-15 09:30:00.985391616 412.28 100 T 652 412.265660
314 2023-05-15 09:30:00.985394944 412.28 100 Q T 752 412.267566

315 rows × 7 columns

In this case, we are back to the same number of ticks as the number trades as an output tick is only created on arrival of an input tick.

All of the aggregation operations support grouping.

q = otp.DataSource('NYSE_TAQ', tick_type='TRD')
q = q[['PRICE', 'SIZE', 'COND', 'EXCHANGE']]
q = q.agg({
    'volume': otp.agg.sum('SIZE'),
    'vwap': otp.agg.vwap('PRICE', 'SIZE')
}, group_by=['EXCHANGE'])
otp.run(q, start=s, end=e, symbols=['SPY'])
Time EXCHANGE volume vwap
0 2023-05-15 09:30:01 A 100 412.240000
1 2023-05-15 09:30:01 B 100 412.220000
2 2023-05-15 09:30:01 C 102 412.250000
3 2023-05-15 09:30:01 D 3269 412.231798
4 2023-05-15 09:30:01 H 8 412.260000
5 2023-05-15 09:30:01 K 2559 412.240684
6 2023-05-15 09:30:01 N 422 412.249976
7 2023-05-15 09:30:01 P 304988 412.210209
8 2023-05-15 09:30:01 T 8141 412.243563
9 2023-05-15 09:30:01 U 400 412.250000
10 2023-05-15 09:30:01 X 200 412.250000
11 2023-05-15 09:30:01 Z 4063 412.238673

Note that in non-running mode OneTick unconditionally divides the whole time interval into specified number of buckets. It means that you will always get this specified number of ticks in the result, even if you have less ticks in the input data. For example, aggregating this empty data will result in 10 ticks nonetheless:

t = otp.Empty()
t = t.agg({'COUNT': otp.agg.count()}, bucket_interval=0.1)
otp.run(t, start=s, end=e)
Time COUNT
0 2023-05-15 09:30:00.100 0
1 2023-05-15 09:30:00.200 0
2 2023-05-15 09:30:00.300 0
3 2023-05-15 09:30:00.400 0
4 2023-05-15 09:30:00.500 0
5 2023-05-15 09:30:00.600 0
6 2023-05-15 09:30:00.700 0
7 2023-05-15 09:30:00.800 0
8 2023-05-15 09:30:00.900 0
9 2023-05-15 09:30:01.000 0

A list of all aggregations appears here. It can also be retrieved with dir(otp.agg).

Aggregation Use Cases#

Creating Bars

Golden Cross strategy