
Source.process_by_group(process_source_func, group_by=None, source_name=None, num_threads=None, inplace=False)#

Groups data by group_by and run process_source_func for each group and merge outputs for every group. Note process_source_func will be converted to Onetick object and passed to query, that means that python callable will be called only once.

  • process_source_func (callable) – process_source_func should take Source apply necessary logic and return it or tuple of Source in this case all of them should have a common root that is the input Source. The number of sources returned by this method is the same as the number of sources returned by process_source_func.

  • group_by (list) –

    A list of field names to group input ticks by.

    If group_by is None then no group_by fields are defined and logic of process_source_func is applied to all input ticks at once

  • source_name (str) – A name for the source that represents all of group_by sources. Can be passed here or as a name of the inner sources; if passed by both ways, should be consistent

  • num_threads (int) – If specified and not zero, turns on asynchronous processing mode and specifies number of threads to be used for processing input ticks. If this parameter is not specified or zero, then input ticks are processed synchronously.

  • inplace (bool) – If True - nothing will be returned and changes will be applied to current query otherwise changes query will be returned. Error is raised if inplace is set to True and multiple sources returned by process_source_func.

Return type

Source, Tuple[Source] or None


>>> d = otp.Ticks(X=[1, 1, 2, 2],
...               Y=[1, 2, 3, 4])
>>> def func(source):
...     return source.first()
>>> res = d.process_by_group(func, group_by=['X'])
>>> otp.run(res)[["X", "Y"]]
   X  Y
0  1  1
1  2  3

Set asynchronous processing:

>>> res = d.process_by_group(func, group_by=['X'], num_threads=2)
>>> otp.run(res)[['X', 'Y']]
   X  Y
0  1  1
1  2  3

Return multiple outputs, each with unique grouping logic:

>>> d = otp.Ticks(X=[1, 1, 2, 2],
...               Y=[1, 2, 1, 3])
>>> def func(source):
...     source['Z'] = source['X']
...     source2 = source.copy()
...     source = source.first()
...     source2 = source2.last()
...     return source, source2
>>> res1, res2 = d.process_by_group(func, group_by=['Y'])
>>> df1 = otp.run(res1)
>>> df2 = otp.run(res2)
>>> df1[['X', 'Y', 'Z']]
   X  Y  Z
0  1  1  1
1  1  2  1
2  2  3  2
>>> df2[['X', 'Y', 'Z']]
   X  Y  Z
0  1  2  1
1  2  1  2
2  2  3  2

See also

GROUP_BY OneTick event processor