otp.Source.process_by_group#
- Source.process_by_group(process_source_func, group_by=None, source_name=None, num_threads=None, inplace=False)#
Groups data by
group_by
and runprocess_source_func
for each group and merge outputs for every group. Noteprocess_source_func
will be converted to Onetick object and passed to query, that means that python callable will be called only once.- Parameters
process_source_func (callable) –
process_source_func
should takeSource
apply necessary logic and return it or tuple ofSource
in this case all of them should have a common root that is the inputSource
. The number of sources returned by this method is the same as the number of sources returned byprocess_source_func
.group_by (list) –
A list of field names to group input ticks by.
If group_by is None then no group_by fields are defined and logic of
process_source_func
is applied to all input ticks at oncesource_name (str) – A name for the source that represents all of group_by sources. Can be passed here or as a name of the inner sources; if passed by both ways, should be consistent
num_threads (int) – If specified and not zero, turns on asynchronous processing mode and specifies number of threads to be used for processing input ticks. If this parameter is not specified or zero, then input ticks are processed synchronously.
inplace (bool) – If True - nothing will be returned and changes will be applied to current query otherwise changes query will be returned. Error is raised if
inplace
is set to True and multiple sources returned byprocess_source_func
.
- Return type
Examples
>>> d = otp.Ticks(X=[1, 1, 2, 2], ... Y=[1, 2, 3, 4]) >>> def func(source): ... return source.first() >>> res = d.process_by_group(func, group_by=['X']) >>> otp.run(res)[["X", "Y"]] X Y 0 1 1 1 2 3
Set asynchronous processing:
>>> res = d.process_by_group(func, group_by=['X'], num_threads=2) >>> otp.run(res)[['X', 'Y']] X Y 0 1 1 1 2 3
Return multiple outputs, each with unique grouping logic:
>>> d = otp.Ticks(X=[1, 1, 2, 2], ... Y=[1, 2, 1, 3]) >>> def func(source): ... source['Z'] = source['X'] ... source2 = source.copy() ... source = source.first() ... source2 = source2.last() ... return source, source2 >>> res1, res2 = d.process_by_group(func, group_by=['Y']) >>> df1 = otp.run(res1) >>> df2 = otp.run(res2) >>> df1[['X', 'Y', 'Z']] X Y Z 0 1 1 1 1 1 2 1 2 2 3 2 >>> df2[['X', 'Y', 'Z']] X Y Z 0 1 2 1 1 2 1 2 2 2 3 2
See also
GROUP_BY OneTick event processor