otp.Source.join_with_collection#
- Source.join_with_collection(collection_name, query_func=None, how='outer', params=None, start=None, end=None, prefix=None, caching=None, keep_time=None, default_fields_for_outer_join=None)#
For each tick uses
query_func
to join ticks fromcollection_name
tick collection (tick set, unordered tick set, tick list, or tick deque).- Parameters
collection_name (str) – Name of the collection state variable from which to join ticks. Collections are the following types:
TickSet
,TickSetUnordered
,TickList
andTickDeque
.query_func (callable) –
Callable
query_func
should returnSource
. If passed, this query will be used on ticks from collection before joining them. In this case,query_func
object will be evaluated by OneTick (not python) for every input tick. Note that python code will be executed only once, so all python’s conditional expressions will be evaluated only once too.Callable should have
source
parameter. When callable is called, this parameter will have value of aSource
object representing ticks loaded directly from the collection. Any operation applied to this source will be applied to ticks from the collection before joining them.Also, callable should have the parameters with names from
params
if they are specified in this method.If
query_func
is not passed, then all ticks from the collection will be joined.how ('inner', 'outer') – Type of join. If inner, then output tick is propagated only if some ticks from the collection were joined to the input tick.
params (dict) – Mapping of the parameters’ names and their values for the
query_func
.Columns
can be used as a value.start (datetime, Operation) – Start time to select ticks from collection. If specified, only ticks in collection that have higher or equal timestamp will be processed. If not passed, then there will be no lower time bound for the collection ticks. This means that even ticks with TIMESTAMP lower than _START_TIME of the main query will be joined.
end (datetime, Operation) – End time to select ticks from collection. If specified, only ticks in collection that have lower timestamp will be processed. If not passed, then there will be no upper time bound for the collection ticks. This means that even ticks with TIMESTAMP higher than _END_TIME of the main query will be joined.
prefix (str) – Prefix for the names of joined tick fields.
caching (str) –
If None caching is disabled. You can specify caching by using values:
’per_symbol’: cache is different for each symbol.
keep_time (str) – Name for the joined timestamp column. None means no timestamp column will be joined.
default_fields_for_outer_join (dict) – When you use outer join, all output ticks will have fields from the schema of the joined source. If nothing was joined to a particular output tick, these fields will have default values for their type. This parameter allows to override the values that would be added to ticks for which nothing was joined. Dictionary keys should be field names, and dictionary values should be constants or
Operation
expressionsself (Source) –
- Returns
Source with joined ticks from
collection_name
- Return type
Examples
>>> src = otp.Tick(A=1) >>> src.state_vars['TICK_SET'] = otp.state.tick_set('LATEST_TICK', 'B', otp.eval(otp.Tick(B=1, C='STR'))) >>> src = src.join_with_collection('TICK_SET') >>> otp.run(src)[["A", "B", "C"]] A B C 0 1 1 STR
>>> src = otp.Ticks(A=[1, 2, 3, 4, 5], ... B=[2, 2, 3, 3, 3]) >>> src.state_vars['TICK_LIST'] = otp.state.tick_list() >>> def fun(tick): tick.state_vars['TICK_LIST'].push_back(tick) >>> src = src.script(fun) >>> def join_fun(source, param_b): ... source = source.agg(dict(VALUE=otp.agg.sum(source['A']))) ... source['VALUE'] = source['VALUE'] + param_b ... return source >>> src = src.join_with_collection('TICK_LIST', join_fun, params=dict(param_b=src['B'])) >>> otp.run(src)[["A", "B", "VALUE"]] A B VALUE 0 1 2 3 1 2 2 5 2 3 3 9 3 4 3 13 4 5 3 18
Join last standing quote from each exchange to trades:
>>> trd = otp.Ticks(offset=[1000, 2000, 3000, 4000, 5000], ... PRICE=[10.1, 10.2, 10.15, 10.23, 10.4], ... SIZE=[100, 50, 100, 60, 200]) >>> qte = otp.Ticks(offset=[500, 600, 1200, 2500, 3500, 3600, 4800], ... EXCHANGE=['N', 'C', 'Q', 'Q', 'C', 'N', 'C'], ... ASK_PRICE=[10.2, 10.18, 10.18, 10.15, 10.31, 10.32, 10.44], ... BID_PRICE=[10.1, 10.17, 10.17, 10.1, 10.23, 10.31, 10.4]) >>> trd['TICK_TYPE'] = 'TRD' >>> qte['TICK_TYPE'] = 'QTE' >>> trd_qte = trd + qte >>> trd_qte.state_vars['LAST_QUOTE_PER_EXCHANGE'] = otp.state.tick_set( ... 'LATEST', 'EXCHANGE', ... schema=['EXCHANGE', 'ASK_PRICE', 'BID_PRICE']) >>> trd_qte = trd_qte.state_vars['LAST_QUOTE_PER_EXCHANGE'].update(where=trd_qte['TICK_TYPE'] == 'QTE', ... value_fields=['ASK_PRICE', 'BID_PRICE']) >>> trd, _ = trd_qte[trd_qte['TICK_TYPE'] == 'TRD'] >>> trd.drop(['ASK_PRICE', 'BID_PRICE', 'EXCHANGE'], inplace=True) >>> trd = trd.join_with_collection('LAST_QUOTE_PER_EXCHANGE') >>> otp.run(trd)[['PRICE', 'SIZE', 'EXCHANGE', 'ASK_PRICE', 'BID_PRICE']] PRICE SIZE EXCHANGE ASK_PRICE BID_PRICE 0 10.10 100 N 10.20 10.10 1 10.10 100 C 10.18 10.17 2 10.20 50 N 10.20 10.10 3 10.20 50 C 10.18 10.17 4 10.20 50 Q 10.18 10.17 5 10.15 100 N 10.20 10.10 6 10.15 100 C 10.18 10.17 7 10.15 100 Q 10.15 10.10 8 10.23 60 N 10.32 10.31 9 10.23 60 C 10.31 10.23 10 10.23 60 Q 10.15 10.10 11 10.40 200 N 10.32 10.31 12 10.40 200 C 10.44 10.40 13 10.40 200 Q 10.15 10.10
See also
JOIN_WITH_COLLECTION_SUMMARY OneTick event processor