# otp.agg.generic

### generic(query_fun, bucket_delimiter=False, bucket_interval=0, bucket_units=None, bucket_time='end', bucket_end_condition=None, running=False, group_by=None, groups_to_display='all', end_condition_per_group=False, boundary_tick_bucket='new')

Generic aggregation.
Aggregation logic is provided in `query_fun` parameter
and this logic is applied for ticks in each bucket.
Currently, this aggregation can be used only with `.apply()` method.

Note, that `query_fun` should return a [`Source`](../source/root.md#onetick.py.Source) object,
assuming that resulted query have only one tick per bucket.

Also, `query_fun` could have additional parameters, which will be passed to `query_fun`
during aggregation. Those parameters should be specified in `.apply()` as keyword arguments,
ex: `.apply(src, additional_param=1)`.

Generic aggregations could be also used in [`onetick.py.Source.agg()`](../source/agg.md#onetick.py.Source.agg) method.
Dict key set for an aggregation in `aggs` parameter will be used as prefix
for each output column of this aggregation.
Also, all tick fields not contained in the Python-level schema in a passed to generic aggregation
`Source` object will be removed.

* **Parameters:**
  * **query_fun** (*Callable*) -- Function that takes [`Source`](../source/root.md#onetick.py.Source) as a parameter,
    applies some aggregation logic to it
    and returns [`Source`](../source/root.md#onetick.py.Source) as a result.
    Note that currently only methods that support dynamic symbol change
    could be used in the provided function.
    For example, [`rename()`](../source/rename.md#onetick.py.Source.rename) can't be used.
    If you try to use such methods here, you will get an error during runtime.
  * **bucket_delimiter** ([*bool*](https://docs.python.org/3/library/functions.html#bool) *,* *default=False*) -- When set to `True` an extra tick is created after each bucket.
    Also, an additional column, called DELIMITER, is added to output ticks.
    The extra tick has values of all fields set to the defaults (0,NaN,""),
    except the delimiter field, which is set to "D"
    All other ticks have the DELIMITER set to string zero "0".
  * **bucket_interval** (int or float or [`Operation`](../operation/root.md#onetick.py.Operation) or [`OnetickParameter`](../misc/param.md#onetick.py.core.column_operations.base.OnetickParameter) or [`symbol parameter`](../source/Symbol.md#onetick.py.core._source.symbol.SymbolType.__getitem__) or [datetime offset object](../datetime/offsets/root.md#id1), default=0) -- 

    Determines the length of each bucket (units depends on `bucket_units`).

    If [`Operation`](../operation/root.md#onetick.py.Operation) of bool type is passed, acts as `bucket_end_condition`.

    Bucket interval can also be set as a *float* value
    if `bucket_units` is set to *seconds*.
    Note that values less than 0.001 (1 millisecond) are not supported.

    Bucket interval can be set via some of the [datetime offset objects](../datetime/offsets/root.md#id1):
    [`otp.Milli`](../datetime/offsets/milli.md#onetick.py.Milli), [`otp.Second`](../datetime/offsets/second.md#onetick.py.Second),
    [`otp.Minute`](../datetime/offsets/minute.md#onetick.py.Minute), [`otp.Hour`](../datetime/offsets/hour.md#onetick.py.Hour),
    [`otp.Day`](../datetime/offsets/day.md#onetick.py.Day), [`otp.Month`](../datetime/offsets/month.md#onetick.py.Month).
    In this case you could omit setting `bucket_units` parameter.

    Bucket interval can also be set with integer [`OnetickParameter`](../misc/param.md#onetick.py.core.column_operations.base.OnetickParameter)
    or [`symbol parameter`](../source/Symbol.md#onetick.py.core._source.symbol.SymbolType.__getitem__).
  * **bucket_units** (*Optional* *[**Literal* *[* *'seconds'* *,*  *'ticks'* *,*  *'days'* *,*  *'months'* *,*  *'flexible'* *]* *]* *,* *default=None*) -- 

    Set bucket interval units.

    By default, if `bucket_units` and `bucket_end_condition` not specified, set to **seconds**.
    If `bucket_end_condition` specified, then `bucket_units` set to **flexible**.

    If set to **flexible** then `bucket_end_condition` must be set.

    Note that **seconds** bucket unit doesn't take into account daylight-saving time of the timezone,
    so you may not get expected results when using, for example, 24 \* 60 \* 60 seconds as bucket interval.
    In such case use **days** bucket unit instead.
    See example in [`onetick.py.agg.sum()`](sum.md#onetick.py.agg.sum).
  * **bucket_time** (*Literal* *[* *'start'* *,*  *'end'* *]* *,* *default=end*) -- 

    Control output timestamp.
    * **start**

      the timestamp  assigned to the bucket is the start time of the bucket.
    * **end**

      the timestamp assigned to the bucket is the end time of the bucket.
  * **bucket_end_condition** (*condition* *,* *default=None*) -- 

    An expression that is evaluated on every tick. If it evaluates to "True", then a new bucket is created.
    This parameter is only used if `bucket_units` is set to "flexible".

    Also can be set via `bucket_interval` parameter by passing [`Operation`](../operation/root.md#onetick.py.Operation) object.
  * **running** ([*bool*](https://docs.python.org/3/library/functions.html#bool) *,* *default=False*) -- 

    See [Aggregation buckets guide](root.md#buckets-guide) to see examples of how this parameter works.

    Specifies if the aggregation will be calculated as a sliding window.
    `running` and `bucket_interval` parameters determines when new buckets are created.
    * `running` = True

      aggregation will be calculated in a sliding window.
      * `bucket_interval` = N (N > 0)

        Window size will be N. Output tick will be generated when tick "enter" window (**arrival event**) and
        when "exit" window (**exit event**)
      * `bucket_interval` = 0

        Left boundary of window will be set to query start time. For each tick aggregation will be calculated in
        the interval [start_time; tick_t] from query start time to the tick's timestamp (inclusive).
    * `running` = False (default)

      buckets partition the [query start time, query end time) interval into non-overlapping intervals
      of size `bucket_interval` (with the last interval possibly of a smaller size).
      If `bucket_interval` is set to **0** a single bucket for the entire interval is created.

      Note that in non-running mode OneTick unconditionally divides the whole time interval
      into specified number of buckets.
      It means that you will always get this specified number of ticks in the result,
      even if you have less ticks in the input data.

    Default: False
  * **group_by** ([*list*](https://docs.python.org/3/library/stdtypes.html#list) *,* [*str*](https://docs.python.org/3/library/stdtypes.html#str) *or* *expression* *,* *default=None*) -- When specified, each bucket is broken further into additional sub-buckets based on specified field values.
    If [`Operation`](../operation/root.md#onetick.py.Operation) is used then GROUP_{i} column is added. Where i is index in group_by list.
    For example, if Operation is the only element in `group_by` list then GROUP_0 field will be added.
  * **groups_to_display** (*Literal* *[* *'all'* *,*  *'previous'* *]* *,* *default=all*) -- Specifies for which sub-buckets (groups) ticks should be shown for each bucket interval.
    By default **all** groups are shown at the end of each bucket interval.
    If this parameter is set to **event_in_last_bucket**, only the groups that received at least one tick
    within a given bucket interval are shown.
  * **end_condition_per_group** ([*bool*](https://docs.python.org/3/library/functions.html#bool) *,* *default=False*) -- 

    Controls application of `bucket_end_condition` in groups.
    * `end_condition_per_group` = True

      `bucket_end_condition` is applied only to the group defined by `group_by`
    * `end_condition_per_group` = False

      `bucket_end_condition` applied across all groups

    This parameter is only used if `bucket_units` is set to "flexible".

    When set to True, applies to all bucketing conditions. Useful, for example, if you need to specify `group_by`,
    and you want to group items first, and create buckets after that.
  * **boundary_tick_bucket** (*Literal* *[* *'new'* *,*  *'previous'* *]* *,* *default=new*) -- 

    Controls boundary tick ownership.
    * **previous**

      A tick on which `bucket_end_condition` evaluates to "true" belongs to the bucket being closed.
    * **new**

      tick belongs to the new bucket.

    This parameter is only used if `bucket_units` is set to "flexible"

#### NOTE
Some functions may be not supported in `query_fun`.
For example, [`join()`](../functions/join.md#onetick.py.join) and [`rename()`](../source/rename.md#onetick.py.Source.rename).

### Examples

The simplest case, just copying some other aggregation logic:

```pycon
>>> data = otp.Ticks({'A': [1, 2, 3]})
>>> def agg_fun(source):
...     return source.agg({'X': otp.agg.count()})
>>> data = otp.agg.generic(agg_fun).apply(data)
>>> otp.run(data)
        Time  X
0 2003-12-04  3
```

Passing parameters to aggregation function:

```pycon
>>> data = otp.Ticks({'A': [1, 2, 1]})
>>> def count_values(source, value):
...     values = source.where(source['A'] == value)
...     return values.agg({'count': otp.agg.count()})
>>> data = otp.agg.generic(count_values).apply(data, value=1)
>>> otp.run(data)
        Time  count
0 2003-12-04  2
```

Getting first 3 ticks from 5 milliseconds buckets:

```pycon
>>> data = otp.Ticks({'A': list(range(10))})
>>> def agg_fun(source, n):
...     return source.first(n)
>>> data = otp.agg.generic(agg_fun, bucket_interval=0.005).apply(data, n=3)
>>> otp.run(data, start=otp.config.default_start_time, end=otp.config.default_start_time + otp.Milli(10))
                     Time  A
0 2003-12-01 00:00:00.005  0
1 2003-12-01 00:00:00.005  1
2 2003-12-01 00:00:00.005  2
3 2003-12-01 00:00:00.010  5
4 2003-12-01 00:00:00.010  6
5 2003-12-01 00:00:00.010  7
```

Using generic aggregation inside [`onetick.py.Source.agg()`](../source/agg.md#onetick.py.Source.agg) method:

```pycon
>>> def agg_fun(source):
...     return source.agg({'SUM': otp.agg.sum('X'), 'AVG': otp.agg.average('X')})
>>> data = otp.Ticks(X=[1, 2, 3, 4, 5])
>>> data = data.agg({'X': otp.agg.generic(agg_fun)})
>>> otp.run(data)
        Time  X.SUM  X.AVG
0 2003-12-04     15    3.0
```

#### SEE ALSO
**GENERIC_AGGREGATION** OneTick event processor
