otp.Source.process_by_group#
- Source.process_by_group(process_source_func, group_by=None, source_name=None, num_threads=None, inplace=False)#
- Groups data by - group_byand run- process_source_funcfor each group and merge outputs for every group. Note- process_source_funcwill be converted to Onetick object and passed to query, that means that python callable will be called only once.- Parameters
- process_source_func (callable) – - process_source_funcshould take- Sourceapply necessary logic and return it or tuple of- Sourcein this case all of them should have a common root that is the input- Source. The number of sources returned by this method is the same as the number of sources returned by- process_source_func.
- group_by (list) – - A list of field names to group input ticks by. - If group_by is None then no group_by fields are defined and logic of - process_source_funcis applied to all input ticks at once
- source_name (str) – A name for the source that represents all of group_by sources. Can be passed here or as a name of the inner sources; if passed by both ways, should be consistent 
- num_threads (int) – If specified and not zero, turns on asynchronous processing mode and specifies number of threads to be used for processing input ticks. If this parameter is not specified or zero, then input ticks are processed synchronously. 
- inplace (bool) – If True - nothing will be returned and changes will be applied to current query otherwise changes query will be returned. Error is raised if - inplaceis set to True and multiple sources returned by- process_source_func.
- self (Source) – 
 
- Return type
 - Examples - >>> d = otp.Ticks(X=[1, 1, 2, 2], ... Y=[1, 2, 3, 4]) >>> def func(source): ... return source.first() >>> res = d.process_by_group(func, group_by=['X']) >>> otp.run(res)[["X", "Y"]] X Y 0 1 1 1 2 3 - Set asynchronous processing: - >>> res = d.process_by_group(func, group_by=['X'], num_threads=2) >>> otp.run(res)[['X', 'Y']] X Y 0 1 1 1 2 3 - Return multiple outputs, each with unique grouping logic: - >>> d = otp.Ticks(X=[1, 1, 2, 2], ... Y=[1, 2, 1, 3]) >>> def func(source): ... source['Z'] = source['X'] ... source2 = source.copy() ... source = source.first() ... source2 = source2.last() ... return source, source2 >>> res1, res2 = d.process_by_group(func, group_by=['Y']) >>> df1 = otp.run(res1) >>> df2 = otp.run(res2) >>> df1[['X', 'Y', 'Z']] X Y Z 0 1 1 1 1 1 2 1 2 2 3 2 >>> df2[['X', 'Y', 'Z']] X Y Z 0 1 2 1 1 2 1 2 2 2 3 2 - See also - GROUP_BY OneTick event processor