otp.agg.generic#
- generic(query_fun, bucket_delimiter=False, bucket_interval=0, bucket_units=None, bucket_time='end', bucket_end_condition=None, running=False, group_by=None, end_condition_per_group=False, boundary_tick_bucket='new')#
Generic aggregation. Aggregation logic is provided in
query_funparameter and this logic is applied for ticks in each bucket. Currently, this aggregation can be used only with.apply()method.Note, that
query_funshould return aSourceobject, assuming that resulted query have only one tick per bucket.Also,
query_funcould have additional parameters, which will be passed toquery_funduring aggregation. Those parameters should be specified in.apply()as keyword arguments, ex:.apply(src, additional_param=1).- Parameters
query_fun (Callable) – Function that takes
Sourceas a parameter, applies some aggregation logic to it and returnsSourceas a result. Note that currently only methods that support dynamic symbol change could be used in the provided function. For example,rename()can’t be used. If you try to use such methods here, you will get an error during runtime.bucket_delimiter (bool, default=False) – When set to
Truean extra tick is created after each bucket. Also, an additional column, called DELIMITER, is added to output ticks. The extra tick has values of all fields set to the defaults (0,NaN,””), except the delimiter field, which is set to “D” All other ticks have the DELIMITER set to string zero “0”.bucket_interval (Union[int, onetick.py.core.column_operations.base.Operation], default=0) –
Determines the length of each bucket (units depends on
bucket_units).If
Operationpassed, acts asbucket_end_condition.bucket_units (Optional[Literal['seconds', 'ticks', 'days', 'months', 'flexible']], default=None) –
Set bucket interval units.
By default, if
bucket_unitsandbucket_end_conditionnot specified, set to seconds. Ifbucket_end_conditionspecified, thenbucket_unitsset to flexible.If set to flexible then
bucket_end_conditionmust be set.Note that seconds bucket unit doesn’t take into account daylight-saving time of the timezone, so you may not get expected results when using, for example, 24 * 60 * 60 seconds as bucket interval. In such case use days bucket unit instead. See example in
onetick.py.agg.sum().bucket_time (Literal['start', 'end'], default=end) –
Control output timestamp.
start
the timestamp assigned to the bucket is the start time of the bucket.
end
the timestamp assigned to the bucket is the end time of the bucket.
bucket_end_condition (condition, default=None) –
An expression that is evaluated on every tick. If it evaluates to “True”, then a new bucket is created. This parameter is only used if
bucket_unitsis set to “flexible”.Also can be set via
bucket_intervalparameter by passingOperationobject.running (bool, default=False) –
Aggregation will be calculated as sliding window.
runningandbucket_intervalparameters determines when new buckets are created.running= Trueaggregation will be calculated in a sliding window.
bucket_interval= N (N > 0)Window size will be N. Output tick will be generated when tick “enter” window (arrival event) and when “exit” window (exit event)
bucket_interval= 0Left boundary of window will be bound to start time. For each tick aggregation will be calculated in [start_time; tick_t].
running= Falsebuckets partition the [query start time, query end time) interval into non-overlapping intervals of size
bucket_interval(with the last interval possibly of a smaller size). Ifbucket_intervalis set to 0 a single bucket for the entire interval is created.Note that in non-running mode OneTick unconditionally divides the whole time interval into specified number of buckets. It means that you will always get this specified number of ticks in the result, even if you have less ticks in the input data.
Default: False
group_by (list, str or expression, default=None) – When specified, each bucket is broken further into additional sub-buckets based on specified field values. If
Operationis used then GROUP_{i} column is added. Where i is index in group_by list. For example, if Operation is the only element ingroup_bylist then GROUP_0 field will be added.end_condition_per_group (bool, default=False) –
Controls application of
bucket_end_conditionin groups.end_condition_per_group= Truebucket_end_conditionis applied only to the group defined bygroup_byend_condition_per_group= Falsebucket_end_conditionapplied across all groups
This parameter is only used if
bucket_unitsis set to “flexible”.When set to True, applies to all bucketing conditions. Useful, for example, if you need to specify
group_by, and you want to group items first, and create buckets after that.boundary_tick_bucket (Literal['new', 'previous'], default=new) –
Controls boundary tick ownership.
previous
A tick on which
bucket_end_conditionevaluates to “true” belongs to the bucket being closed.new
tick belongs to the new bucket.
This parameter is only used if
bucket_unitsis set to “flexible”
Examples
The simplest case, just copying some other aggregation logic:
>>> data = otp.Ticks({'A': [1, 2, 3]}) >>> def agg_fun(source): ... return source.agg({'X': otp.agg.count()}) >>> data = otp.agg.generic(agg_fun).apply(data) >>> otp.run(data) Time X 0 2003-12-04 3
Passing parameters to aggregation function:
>>> data = otp.Ticks({'A': [1, 2, 1]}) >>> def count_values(source, value): ... values, _ = source[source['A'] == value] ... return values.agg({'count': otp.agg.count()}) >>> data = otp.agg.generic(count_values).apply(data, value=1) >>> otp.run(data) Time count 0 2003-12-04 2
Getting first 3 ticks from 5 milliseconds buckets:
>>> data = otp.Ticks({'A': list(range(10))}) >>> def agg_fun(source, n): ... return source.first(n) >>> data = otp.agg.generic(agg_fun, bucket_interval=0.005).apply(data, n=3) >>> otp.run(data) Time A 0 2003-12-01 00:00:00.005 0 1 2003-12-01 00:00:00.005 1 2 2003-12-01 00:00:00.005 2 3 2003-12-01 00:00:00.010 5 4 2003-12-01 00:00:00.010 6 5 2003-12-01 00:00:00.010 7
See also
GENERIC_AGGREGATION OneTick event processor