otp.Source.join_with_collection#
- Source.join_with_collection(collection_name, query_func=None, how='outer', params=None, start=None, end=None, prefix=None, caching=None, keep_time=None, default_fields_for_outer_join=None)#
- For each tick uses - query_functo join ticks from- collection_nametick collection (tick set, unordered tick set, tick list, or tick deque).- Parameters
- collection_name (str) – Name of the collection state variable from which to join ticks. Collections are the following types: - TickSet,- TickSetUnordered,- TickListand- TickDeque.
- query_func (callable) – - Callable - query_funcshould return- Source. If passed, this query will be used on ticks from collection before joining them. In this case,- query_funcobject will be evaluated by OneTick (not python) for every input tick. Note that python code will be executed only once, so all python’s conditional expressions will be evaluated only once too.- Callable should have - sourceparameter. When callable is called, this parameter will have value of a- Sourceobject representing ticks loaded directly from the collection. Any operation applied to this source will be applied to ticks from the collection before joining them.- Also, callable should have the parameters with names from - paramsif they are specified in this method.- If - query_funcis not passed, then all ticks from the collection will be joined.
- how ('inner', 'outer') – Type of join. If inner, then output tick is propagated only if some ticks from the collection were joined to the input tick. 
- params (dict) – Mapping of the parameters’ names and their values for the - query_func.- Columnscan be used as a value.
- start (datetime, Operation) – Start time to select ticks from collection. If specified, only ticks in collection that have higher or equal timestamp will be processed. If not passed, then there will be no lower time bound for the collection ticks. This means that even ticks with TIMESTAMP lower than _START_TIME of the main query will be joined. 
- end (datetime, Operation) – End time to select ticks from collection. If specified, only ticks in collection that have lower timestamp will be processed. If not passed, then there will be no upper time bound for the collection ticks. This means that even ticks with TIMESTAMP higher than _END_TIME of the main query will be joined. 
- prefix (str) – Prefix for the names of joined tick fields. 
- caching (str) – - If None caching is disabled. You can specify caching by using values: - ’per_symbol’: cache is different for each symbol. 
 
- keep_time (str) – Name for the joined timestamp column. None means no timestamp column will be joined. 
- default_fields_for_outer_join (dict) – When you use outer join, all output ticks will have fields from the schema of the joined source. If nothing was joined to a particular output tick, these fields will have default values for their type. This parameter allows to override the values that would be added to ticks for which nothing was joined. Dictionary keys should be field names, and dictionary values should be constants or - Operationexpressions
 
- Returns
- Source with joined ticks from - collection_name
- Return type
 - Examples - >>> src = otp.Tick(A=1) >>> src.state_vars['TICK_SET'] = otp.state.tick_set('LATEST_TICK', 'B', otp.eval(otp.Tick(B=1, C='STR'))) >>> src = src.join_with_collection('TICK_SET') >>> otp.run(src)[["A", "B", "C"]] A B C 0 1 1 STR - >>> src = otp.Ticks(A=[1, 2, 3, 4, 5], ... B=[2, 2, 3, 3, 3]) >>> src.state_vars['TICK_LIST'] = otp.state.tick_list() >>> def fun(tick): tick.state_vars['TICK_LIST'].push_back(tick) >>> src = src.script(fun) >>> def join_fun(source, param_b): ... source = source.agg(dict(VALUE=otp.agg.sum(source['A']))) ... source['VALUE'] = source['VALUE'] + param_b ... return source >>> src = src.join_with_collection('TICK_LIST', join_fun, params=dict(param_b=src['B'])) >>> otp.run(src)[["A", "B", "VALUE"]] A B VALUE 0 1 2 3 1 2 2 5 2 3 3 9 3 4 3 13 4 5 3 18 - Join last standing quote from each exchange to trades: - >>> trd = otp.Ticks(offset=[1000, 2000, 3000, 4000, 5000], ... PRICE=[10.1, 10.2, 10.15, 10.23, 10.4], ... SIZE=[100, 50, 100, 60, 200]) >>> qte = otp.Ticks(offset=[500, 600, 1200, 2500, 3500, 3600, 4800], ... EXCHANGE=['N', 'C', 'Q', 'Q', 'C', 'N', 'C'], ... ASK_PRICE=[10.2, 10.18, 10.18, 10.15, 10.31, 10.32, 10.44], ... BID_PRICE=[10.1, 10.17, 10.17, 10.1, 10.23, 10.31, 10.4]) >>> trd['TICK_TYPE'] = 'TRD' >>> qte['TICK_TYPE'] = 'QTE' >>> trd_qte = trd + qte >>> trd_qte.state_vars['LAST_QUOTE_PER_EXCHANGE'] = otp.state.tick_set( ... 'LATEST', 'EXCHANGE', ... schema=['EXCHANGE', 'ASK_PRICE', 'BID_PRICE']) >>> trd_qte = trd_qte.state_vars['LAST_QUOTE_PER_EXCHANGE'].update(where=trd_qte['TICK_TYPE'] == 'QTE', ... value_fields=['ASK_PRICE', 'BID_PRICE']) >>> trd, _ = trd_qte[trd_qte['TICK_TYPE'] == 'TRD'] >>> trd.drop(['ASK_PRICE', 'BID_PRICE', 'EXCHANGE'], inplace=True) >>> trd = trd.join_with_collection('LAST_QUOTE_PER_EXCHANGE') >>> otp.run(trd)[['PRICE', 'SIZE', 'EXCHANGE', 'ASK_PRICE', 'BID_PRICE']] PRICE SIZE EXCHANGE ASK_PRICE BID_PRICE 0 10.10 100 N 10.20 10.10 1 10.10 100 C 10.18 10.17 2 10.20 50 N 10.20 10.10 3 10.20 50 C 10.18 10.17 4 10.20 50 Q 10.18 10.17 5 10.15 100 N 10.20 10.10 6 10.15 100 C 10.18 10.17 7 10.15 100 Q 10.15 10.10 8 10.23 60 N 10.32 10.31 9 10.23 60 C 10.31 10.23 10 10.23 60 Q 10.15 10.10 11 10.40 200 N 10.32 10.31 12 10.40 200 C 10.44 10.40 13 10.40 200 Q 10.15 10.10 - See also - JOIN_WITH_COLLECTION_SUMMARY OneTick event processor