otp.Source.join_with_collection#
- Source.join_with_collection(collection_name, query_func=None, how='outer', params=None, start=None, end=None, prefix=None, caching=None, keep_time=None, default_fields_for_outer_join=None)#
For each tick uses
query_functo join ticks fromcollection_nametick collection (tick set, unordered tick set, tick list, or tick deque).- Parameters
collection_name (str) – Name of the collection state variable from which to join ticks. Collections are the following types:
TickSet,TickSetUnordered,TickListandTickDeque.query_func (callable) –
Callable
query_funcshould returnSource. If passed, this query will be used on ticks from collection before joining them. In this case,query_funcobject will be evaluated by OneTick (not python) for every input tick. Note that python code will be executed only once, so all python’s conditional expressions will be evaluated only once too.Callable should have
sourceparameter. When callable is called, this parameter will have value of aSourceobject representing ticks loaded directly from the collection. Any operation applied to this source will be applied to ticks from the collection before joining them.Also, callable should have the parameters with names from
paramsif they are specified in this method.If
query_funcis not passed, then all ticks from the collection will be joined.how ('inner', 'outer') – Type of join. If inner, then output tick is propagated only if some ticks from the collection were joined to the input tick.
params (dict) – Mapping of the parameters’ names and their values for the
query_func.Columnscan be used as a value.start (datetime, Operation) – Start time to select ticks from collection. If specified, only ticks in collection that have higher or equal timestamp will be processed. If not passed, then there will be no lower time bound for the collection ticks. This means that even ticks with TIMESTAMP lower than _START_TIME of the main query will be joined.
end (datetime, Operation) – End time to select ticks from collection. If specified, only ticks in collection that have lower timestamp will be processed. If not passed, then there will be no upper time bound for the collection ticks. This means that even ticks with TIMESTAMP higher than _END_TIME of the main query will be joined.
prefix (str) – Prefix for the names of joined tick fields.
caching (str) –
If None caching is disabled. You can specify caching by using values:
’per_symbol’: cache is different for each symbol.
keep_time (str) – Name for the joined timestamp column. None means no timestamp column will be joined.
default_fields_for_outer_join (dict) – When you use outer join, all output ticks will have fields from the schema of the joined source. If nothing was joined to a particular output tick, these fields will have default values for their type. This parameter allows to override the values that would be added to ticks for which nothing was joined. Dictionary keys should be field names, and dictionary values should be constants or
Operationexpressions
- Returns
Source with joined ticks from
collection_name- Return type
Examples
>>> src = otp.Tick(A=1) >>> src.state_vars['TICK_SET'] = otp.state.tick_set('LATEST_TICK', 'B', otp.eval(otp.Tick(B=1, C='STR'))) >>> src = src.join_with_collection('TICK_SET') >>> otp.run(src)[["A", "B", "C"]] A B C 0 1 1 STR
>>> src = otp.Ticks(A=[1, 2, 3, 4, 5], ... B=[2, 2, 3, 3, 3]) >>> src.state_vars['TICK_LIST'] = otp.state.tick_list() >>> def fun(tick): tick.state_vars['TICK_LIST'].push_back(tick) >>> src = src.script(fun) >>> def join_fun(source, param_b): ... source = source.agg(dict(VALUE=otp.agg.sum(source['A']))) ... source['VALUE'] = source['VALUE'] + param_b ... return source >>> src = src.join_with_collection('TICK_LIST', join_fun, params=dict(param_b=src['B'])) >>> otp.run(src)[["A", "B", "VALUE"]] A B VALUE 0 1 2 3 1 2 2 5 2 3 3 9 3 4 3 13 4 5 3 18
Join last standing quote from each exchange to trades:
>>> trd = otp.Ticks(offset=[1000, 2000, 3000, 4000, 5000], ... PRICE=[10.1, 10.2, 10.15, 10.23, 10.4], ... SIZE=[100, 50, 100, 60, 200]) >>> qte = otp.Ticks(offset=[500, 600, 1200, 2500, 3500, 3600, 4800], ... EXCHANGE=['N', 'C', 'Q', 'Q', 'C', 'N', 'C'], ... ASK_PRICE=[10.2, 10.18, 10.18, 10.15, 10.31, 10.32, 10.44], ... BID_PRICE=[10.1, 10.17, 10.17, 10.1, 10.23, 10.31, 10.4]) >>> trd['TICK_TYPE'] = 'TRD' >>> qte['TICK_TYPE'] = 'QTE' >>> trd_qte = trd + qte >>> trd_qte.state_vars['LAST_QUOTE_PER_EXCHANGE'] = otp.state.tick_set( ... 'LATEST', 'EXCHANGE', ... schema=['EXCHANGE', 'ASK_PRICE', 'BID_PRICE']) >>> trd_qte = trd_qte.state_vars['LAST_QUOTE_PER_EXCHANGE'].update(where=trd_qte['TICK_TYPE'] == 'QTE', ... value_fields=['ASK_PRICE', 'BID_PRICE']) >>> trd, _ = trd_qte[trd_qte['TICK_TYPE'] == 'TRD'] >>> trd.drop(['ASK_PRICE', 'BID_PRICE', 'EXCHANGE'], inplace=True) >>> trd = trd.join_with_collection('LAST_QUOTE_PER_EXCHANGE') >>> otp.run(trd)[['PRICE', 'SIZE', 'EXCHANGE', 'ASK_PRICE', 'BID_PRICE']] PRICE SIZE EXCHANGE ASK_PRICE BID_PRICE 0 10.10 100 N 10.20 10.10 1 10.10 100 C 10.18 10.17 2 10.20 50 N 10.20 10.10 3 10.20 50 C 10.18 10.17 4 10.20 50 Q 10.18 10.17 5 10.15 100 N 10.20 10.10 6 10.15 100 C 10.18 10.17 7 10.15 100 Q 10.15 10.10 8 10.23 60 N 10.32 10.31 9 10.23 60 C 10.31 10.23 10 10.23 60 Q 10.15 10.10 11 10.40 200 N 10.32 10.31 12 10.40 200 C 10.44 10.40 13 10.40 200 Q 10.15 10.10
See also
JOIN_WITH_COLLECTION_SUMMARY OneTick event processor