otp.join_by_time#

join_by_time(sources, how='outer', on=None, policy=None, check_schema=True, leading=0, match_if_identical_times=None, output_type_index=None, use_rename_ep=True)#

Joins ticks from multiple input time series, based on input tick timestamps.

leading source tick joined with already arrived ticks from other sources.

>>> leading = otp.Ticks(A=[1, 2], offset=[1, 3])
>>> other = otp.Ticks(B=[1], offset=[2])
>>> otp.run(otp.join_by_time([leading, other]))
                     Time  A  B
0 2003-12-01 00:00:00.001  1  0
1 2003-12-01 00:00:00.003  2  1

In case you willing to add prefix/suffix to all columns in one of the sources you should use Source.add_prefix() or Source.add_suffix()

Parameters
  • sources (Collection[Source]) – The collection of Source objects which will be joined

  • how ('outer' or 'inner') – The method of join (inner or outer)

  • on (Collection[Column]) –

    on add an extra check to join - only ticks with same on fields will be joined

    >>> leading = otp.Ticks(A=[1, 2], offset=[1, 3])
    >>> other = otp.Ticks(A=[2, 2], B=[1, 2], offset=[0, 2])
    >>> otp.run(otp.join_by_time([leading, other], on=['A']))
                         Time  A  B
    0 2003-12-01 00:00:00.001  1  0
    1 2003-12-01 00:00:00.003  2  2
    

  • policy ('arrival_order', 'latest_ticks', 'each_for_leader_with_first' or 'each_for_leader_with_latest') –

    Policy of joining ticks with same timestamps

    >>> leading = otp.Ticks(A=[1, 2], offset=[0, 0], OMDSEQ=[0, 3])
    >>> other = otp.Ticks(B=[1, 2], offset=[0, 0], OMDSEQ=[2, 4])
    

    Note: in the examples below we assume that all ticks have same timestamps, but order of ticks as in example. OMDSEQ is a special field that store order of ticks with same timestamp

    • arrival_order output tick generated on arrival of leading source tick

    >>> data = otp.join_by_time([leading, other], policy='arrival_order')
    >>> otp.run(data)[['Time', 'A', 'B']]
            Time  A  B
    0 2003-12-01  1  0
    1 2003-12-01  2  1
    
    • latest_ticks Tick generated at the time of expiration of a particular timestamp (when all ticks from all sources for current timestamp arrived). Only latest tick from leading source will be used.

    >>> data = otp.join_by_time([leading, other], policy='latest_ticks')
    >>> otp.run(data)[['Time', 'A', 'B']]
            Time  A  B
    0 2003-12-01  2  2
    
    • each_for_leader_with_first Each tick from leading source will be joined with first tick from other sources for current timestamp

    >>> data = otp.join_by_time(
    ...     [leading, other],
    ...     policy='each_for_leader_with_first'
    ... )
    >>> otp.run(data)[['Time', 'A', 'B']]
            Time  A  B
    0 2003-12-01  1  1
    1 2003-12-01  2  1
    
    • each_for_leader_with_latest Each tick from leading source will be joined with last tick from other sources for current timestamp

    >>> data = otp.join_by_time(
    ...     [leading, other],
    ...     policy='each_for_leader_with_latest'
    ... )
    >>> otp.run(data)[['Time', 'A', 'B']]
            Time  A  B
    0 2003-12-01  1  2
    1 2003-12-01  2  2
    

  • check_schema (bool) – If True onetick.py will check that all columns names are unambiguous and columns listed in on param are exists in sources schema. Which can lead to false positive error in case of some event processors were sink to Source. To avoid this set check_scheme to False.

  • leading (int, ‘all’, Source, list of int, list of Source) – A list sources or their indexes. If this parameter is ‘all’, every source is considered to be leading.

  • match_if_identical_times (bool) – A True value of this parameter causes an output tick to be formed from input ticks with identical timestamps only. If on is set to ‘outer’, default values of fields (otp.nan, 0, empty string) are propagated for sources that did not tick at a given timestamp. If this parameter is set to True, the default value of policy parameter is set to ‘latest_ticks’.

  • output_type_index (int) – Specifies index of source in sources from which type and properties of output will be taken. Useful when joining sources that inherited from Source. By default output object type will be Source.

  • use_rename_ep (bool) – Use onetick.query.RenameFields event processor or not. This event processor can’t be used in generic aggregation.

Returns

A time series of ticks.

Return type

Source or same class as sources[output_type_index]

Examples

>>> d1 = otp.Ticks({'A': [1, 2, 3], 'offset': [1, 2, 3]})
>>> d2 = otp.Ticks({'B': [1, 2, 3], 'offset': [1, 2, 4]})
>>> otp.run(otp.join_by_time([d1, d2]))
                     Time  A  B
0 2003-12-01 00:00:00.001  1  0
1 2003-12-01 00:00:00.002  2  1
2 2003-12-01 00:00:00.003  3  2
>>> otp.run(otp.join_by_time([d1, d2], leading=1))
                     Time  A  B
0 2003-12-01 00:00:00.001  1  1
1 2003-12-01 00:00:00.002  2  2
2 2003-12-01 00:00:00.004  3  3
>>> otp.run(otp.join_by_time([d1, d2], leading=1, match_if_identical_times=True))
                     Time  A  B
0 2003-12-01 00:00:00.001  1  1
1 2003-12-01 00:00:00.002  2  2
2 2003-12-01 00:00:00.004  0  3

Adding prefix to right source for all columns:

>>> otp.run(otp.join_by_time([d1, d2.add_prefix('right_')]))
                     Time  A  right_B
0 2003-12-01 00:00:00.001  1        0
1 2003-12-01 00:00:00.002  2        1
2 2003-12-01 00:00:00.003  3        2

Use parameter output_type_index to specify which input class to use to create output object. It may be useful in case some custom user class was used as input:

>>> class CustomTick(otp.Tick):
...     def custom_method(self):
...         return 'custom_result'
>>> data1 = otp.Tick(A=1)
>>> data2 = CustomTick(B=2)
>>> data = otp.join_by_time([data1, data2], match_if_identical_times=True, output_type_index=1)
>>> type(data)
<class 'onetick.py.functions.CustomTick'>
>>> data.custom_method()
'custom_result'
>>> otp.run(data)
        Time  A  B
0 2003-12-01  1  2

See also

JOIN_BY_TIME OneTick event processor