[docs]defmerge(sources,align_schema=True,symbols=None,identify_input_ts=False,presort=adaptive,concurrency=default,batch_size=default,output_type_index=None):""" Merges ticks from the ``sources`` into a single output ordered by the timestamp Parameters ---------- sources : list List of sources to merge align_schema : bool If set to True, then table is added right after merge. We recommended to keep True to prevent problems with different tick schemas. Default: True symbols: str, list of str or functions, :class:`Source` Symbol(s) to run the query for passed as a string, a list of strings, or as a "symbols" query which results include the ``SYMBOL_NAME`` column. The start/end times for the symbols query will taken from the :meth:`run` params. See :ref:`symbols <Symbols>` for more details. identify_input_ts: bool If set to False, the fields SYMBOL_NAME and TICK_TYPE are not appended to the output ticks. presort: bool Add the presort EP in case of bound symbols. Applicable only when ``symbols`` is not None. By default, it is set to True if ``symbols`` are set and to False otherwise. concurrency: int Specifies number of CPU cores to utilize for the ``presort`` By default, the value from otp.config.default_concurrency is used. batch_size: int Specifies the query batch size for the ``presort``. By default, the value from otp.config.default_batch_size is used. output_type_index: int Specifies index of source in ``sources`` from which type and properties of output will be taken. Useful when merging sources that inherited from :class:`Source`. By default, output object type will be :class:`Source`. Return ------ :class:`Source` or same class as ``sources[output_type_index]`` A time series of ticks. See also -------- **MERGE** OneTick event processor Examples -------- ``merge`` is used to merge different data sources >>> data1 = otp.Ticks(X=[1, 2], Y=['a', 'd']) >>> data2 = otp.Ticks(X=[-1, -2], Y=['*', '-']) >>> data = otp.funcs.merge([data1, data2]) # OTdirective: snippet-name:merge.as list; >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.000 1 a 1 2003-12-01 00:00:00.000 -1 * 2 2003-12-01 00:00:00.001 2 d 3 2003-12-01 00:00:00.001 -2 - Merge series from multiple symbols into one series >>> # OTdirective: snippet-name:merge.bound symbols; >>> data = otp.Ticks(X=[1]) >>> data['SYMBOL_NAME'] = data['_SYMBOL_NAME'] >>> symbols = otp.Ticks(SYMBOL_NAME=['A', 'B']) >>> data = otp.merge([data], symbols=symbols) >>> data() Time X SYMBOL_NAME 0 2003-12-01 1 A 1 2003-12-01 1 B Adding symbols param before merge >>> symbols = otp.Ticks(SYMBOL_NAME=['S1', 'S2'], param=[1, -1]) >>> def func(symbol): ... pre = otp.Ticks(X=[1]) ... pre["SYMBOL_NAME"] = symbol.name ... pre["PARAM"] = symbol.param ... return pre >>> data = otp.funcs.merge([func], symbols=symbols) >>> otp.run(data)[['PARAM', 'SYMBOL_NAME']] PARAM SYMBOL_NAME 0 1 S1 1 -1 S2 """fromonetick.py.core.sourceimport_Sourceifnotsources:raiseValueError("Merge should have one or more inputs")output_type=output_type_by_index(sources,output_type_index)ifpresortisadaptive:presort=TrueifsymbolsisnotNoneelseFalseifpresortandnotsymbols:warnings.warn("Using the `presort` parameter makes effect only when ""symbols are specified in the `symbols` parameter")ifconcurrencyisnotdefaultandnotpresort:warnings.warn("Using the `concurrency` parameter makes effect only when ""the `presort` parameter is set to True")ifbatch_sizeisnotdefaultandnotpresort:warnings.warn("Using the `batch_size` parameter makes effect only when ""the `presort` parameter is set to True")ifconcurrencyisdefault:concurrency=(config.default_concurrencyifconfig.default_concurrencyisnotNone# otq.Presort does not support Noneelse'')ifbatch_sizeisdefault:batch_size=config.default_batch_sizedef_base_ep_for_cross_symbol(symbol,tmp_otq):ifpresortandsymbol:base_ep=otq.Presort(batch_size=batch_size,max_concurrency=concurrency)else:base_ep=otq.Merge(identify_input_ts=identify_input_ts)ifsymbol:ifisinstance(symbol,(query,_QueryEvalWrapper)):symbol=symbol.to_eval_string(tmp_otq=tmp_otq)elifisinstance(symbol,_Source):symbol=symbol._convert_symbol_to_string(symbol,tmp_otq=tmp_otq)base_ep=base_ep.symbols(symbol)returnbase_epdef_evaluate_functions_in_sources_list(sources,symbols):result=[]ifnotisinstance(sources,list):sources=[sources]forsinsources:ifnotisinstance(s,_Source)andcallable(s):num_params=len(inspect.signature(s).parameters)ifnum_params==0:s=s()elifnum_params==1:s=s(symbols.to_symbol_param()ifisinstance(symbols,(_Source,_QueryEvalWrapper))else_SymbolParamSource())else:raiseValueError(f"It is expected only one parameter from the callback, but {num_params} passed")# TODO: test this caseifisinstance(s,_Source):result.append(s)else:raiseValueError("Source and functions (returning _source) are expected as preprocessors")returnresultsources=_evaluate_functions_in_sources_list(sources,symbols)need_table=Falsemerged_columns,need_table,used_columns=_collect_merged_columns(need_table,sources)need_table=_is_table_after_merge_needed(need_table,used_columns)# we need to store internal graphs somewhere while we create base ep from evalintermediate_tmp_otq=TmpOtq()result=output_type(node=_base_ep_for_cross_symbol(symbols,tmp_otq=intermediate_tmp_otq),**merged_columns)result._tmp_otq.merge(intermediate_tmp_otq)__copy_sources_on_merge_or_join(result,sources,symbols,output_type_index=output_type_index)ifpresortandsymbols:result.sink(otq.Merge(identify_input_ts=identify_input_ts))ifidentify_input_ts:result[("SYMBOL_NAME",str)]result[("TICK_TYPE",str)]result=_add_table_after_merge(align_schema,merged_columns,need_table,result)result._fix_varstrings()returnresult
def_add_table_after_merge(add_table,merged_columns,need_table,result):ifadd_tableandneed_table:# a special case, when the add_table parameter is a list of common columns that should# be added to a final table# it is used internallyifisinstance(add_table,list):merged_columns={key:valueforkey,valueinmerged_columns.items()ifkeyinadd_table}iflen(merged_columns):table=otq.Table(fields=",".join(type2str(dtype)+" "+nameforname,dtypeinmerged_columns.items()),keep_input_fields=True,)result.sink(table)returnresultdef__copy_sources_on_merge_or_join(result,sources,symbols=None,names=None,drop_meta=False,leading=None,output_type_index=None,use_rename_ep=True):""" copy columns, state vars and other metadata from joining, merging sources Parameters ---------- result: _Source Source object constructed as join, merge operation, e.g. result = _Source(otq.Merge(sources)) sources: list of _Source, tuple of _Source Sources were joined, merged symbols: Symbols to copy names: list of str or None, tuple of str or None, bool, optional - If collection of string or None than add passthrough eps with such name to `sources` if name is specify or do not add anything if corresponding item in names is None. - If True, than autogenerate such names in __SRC_{number}__ format - If None, False than do not add passthrough eps and do not change node names. drop_meta : bool, optional If True drop TIMESTAMP and OMDSEQ field leading : List of str, Tuple of str, Optional List of leading sources names output_type_index: int, optional Specifies index of source in `sources` from which properties of `result` will be taken. Useful when merging sources that inherited from otp.Source. use_rename_ep: bool Use RenameFields ep or not. This ep can't be used in generic aggregation. Returns ------- None Modify result directly """fromonetick.py.core.sourceimport_Sourceresult._copy_state_vars_from(sources)result._clean_sources_dates()# because it is not a real _sourceforsourceinsources:result._merge_tmp_otq(source)ifsource.get_name():ifnotresult.get_name():result.set_name(source.get_name())ifresult.get_name()!=source.get_name():warnings.warn(f"Merging/joining sources with different names: '{result.get_name()}' "f"and '{source.get_name()}'. Some of those names will be lost")ifisinstance(symbols,_Source):result._merge_tmp_otq(symbols)names=__copy_and_rename_nodes_on_merge_join(result,names,sources,symbols)ifdrop_meta:to_drop=list(map(lambdax:x+".TIMESTAMP",names))to_drop+=list(map(lambdax:x+".OMDSEQ",names))__rename_leading_omdseq(leading,names,result,sources,use_rename_ep=use_rename_ep)result.sink(otq.Passthrough(fields=",".join(to_drop),drop_fields=True))ifoutput_type_indexisnotNone:result._copy_properties_from(sources[output_type_index])def__rename_fields(source,mapping,use_rename_ep=True):""" Function to rename fields from ``mapping`` in ``source``. Note that it is a low-level function that doesn't change python schema of the ``source``. Modifies ``source`` inplace, doesn't return anything. If ``use_rename_ep`` is `True`, then RenameFields ep will be used. """ifuse_rename_ep:source.sink(otq.RenameFields(','.join(f'{k}={v}'fork,vinmapping.items())))return# May be needed, because RenameFields ep is not supported in generic aggregationforold,newinmapping.items():# RenameFields ignores non-existent fields,# all this mess is needed to mimic that logicsource.sink(otq.WhereClause(where=f'UNDEFINED("{old}")'))if_branch_graph=source.node().copy_graph()if_branch_rules=source.node().copy_rules()source.sink(otq.AddField(new,old),out_pin='ELSE')source.sink(otq.Passthrough(old,drop_fields=True))source.sink(otq.Merge(identify_input_ts=False))source.source(if_branch_graph)source.node().add_rules(if_branch_rules)def__rename_leading_omdseq(leading,names,result,sources,use_rename_ep=True):ifleadingisnotNone:iflen(leading)==1:leading=leading.pop()__rename_fields(result,{f"{leading}.OMDSEQ":"OMDSEQ"},use_rename_ep=use_rename_ep)else:number,indexes=__get_number_and_indexes_of_sources_have_field(sources,"OMDSEQ")ifnumber==1:__rename_fields(result,{f"{names[indexes.pop()]}.OMDSEQ":"OMDSEQ"},use_rename_ep=use_rename_ep)elifnumber:raiseValueError("Several sources was specified as leading and OMDSEQ field is presented in more than ""one source. Resulted OMDSEQ can't be derived in such case.")def__get_number_and_indexes_of_sources_have_field(sources,field):number=0indexes=[]forsinsources:iffieldins.columns():indexes.append(number)number+=1returnnumber,indexesdef__copy_and_rename_nodes_on_merge_join(result,names,sources,symbols):# shared eps between sourceseps=defaultdict()ifnamesisTrue:names=[f"__SRC_{n}__"forninrange(len(sources))]ifnotnames:names=itertools.repeat(None)ifsources:forname,srcinzip(names,sources):obj=srcifname:obj=src.copy()obj.sink(otq.Passthrough())obj.node_name(name)result.source(obj.node().copy_graph(eps))result.node().add_rules(obj.node().copy_rules())result._set_sources_dates(obj,copy_symbols=notbool(symbols))returnnamesdef_is_table_after_merge_needed(need_table,used_columns):ifnotneed_table:forkey,valueinused_columns.items():ifnotvalue:need_table=Truebreakreturnneed_tabledef_collect_merged_columns(need_table,sources):merged_columns=sources[0].columns(skip_meta_fields=True)used_columns={key:Falseforkeyinmerged_columns.keys()}forsrcinsources[1:]:forkey,valueinsrc.columns(skip_meta_fields=True).items():ifkeyinmerged_columns:orig_type=merged_columns[key]try:merged_dtype,merged_need_table=get_type_that_includes([orig_type,value])exceptValueErrorase:raiseValueError(f"Column '{key}' has different types for "f"different branches: {orig_type}{value}")fromeneed_table|=merged_need_tablemerged_columns[key]=merged_dtypeelse:need_table=Truemerged_columns[key]=valueifkeyinused_columns:used_columns[key]=Truereturnmerged_columns,need_table,used_columnsdefconcat(sources=None,add_table=True,symbols=None):""" Deprecated: Merges ticks from the sources into a single output _source ordered by the timestamp This function is deprecated due the wrong name notation. Use 'merge' instead. Parameters ---------- sources : list List of sources to merge align_schema : bool If set to True, then table is added right after merge. We recommended to keep True to prevent problems with different tick schemas. Default: True Return ------ A new _source that holds a result of the merged sources """warnings.warn("This function is deprecated due the wrong name notation. Use `merge` instead.",DeprecationWarning)returnmerge(sources=sources,align_schema=add_table,symbols=symbols)
[docs]defjoin(left,right,on,how="outer",rprefix="RIGHT",keep_fields_not_in_schema=False,output_type_index=None):""" Joins two sources ``left`` and ``right`` based on ``on`` condition. In case you willing to add prefix/suffix to all columns in one of the sources you should use :func:`Source.add_prefix` or :func:`Source.add_suffix` Parameters ---------- left: :class:`Source` left source to join right: :class:`Source` right source to join on: :py:class:`~onetick.py.Operation` or 'all' or 'same_size' If 'all' joins every tick from ``left`` with every tick from ``right``. If 'same_size' and size of sources are same, joins ticks from two sources directly, else raises exception. how: 'inner' or 'outer' Joining type. Inner join will only produce ticks that matched the ``on`` condition. Outer join will also produce the ticks from the ``left`` source that didn't match the condition (so it's basically a left-outer join). Doesn't matter for ``on='all'`` and ``on='same_size'``. rprefix: str The name of ``right`` data source. It will be added as prefix to overlapping columns arrived from right to result keep_fields_not_in_schema: bool If True - join function will try to preserve any fields of original sources that are not in the source schema, propagating them to output. This means a possibility of runtime error if fields are duplicating. If False, will remove all fields that are not in schema. output_type_index: int Specifies index of source in sources from which type and properties of output will be taken. Useful when joining sources that inherited from :class:`Source`. By default output object type will be :class:`Source`. Returns ------- :class:`Source` or same class as ``[left, right][output_type_index]`` joined data See also -------- **JOIN** OneTick event processor Examples -------- >>> d1 = otp.Ticks({'ID': [1, 2, 3], 'A': ['a', 'b', 'c']}) >>> d2 = otp.Ticks({'ID': [2, 3, 4], 'B': ['q', 'w', 'e']}) Outer join: >>> otp.join(d1, d2, on=d1['ID'] == d2['ID'], how='outer')() Time ID A RIGHT_ID B 0 2003-12-01 00:00:00.000 1 a 0 1 2003-12-01 00:00:00.001 2 b 2 q 2 2003-12-01 00:00:00.002 3 c 3 w Inner join: >>> otp.join(d1, d2, on=d1['ID'] == d2['ID'], how='inner')() Time ID A RIGHT_ID B 0 2003-12-01 00:00:00.001 2 b 2 q 1 2003-12-01 00:00:00.002 3 c 3 w Join all ticks: >>> otp.join(d1, d2, on='all')() Time ID A RIGHT_ID B 0 2003-12-01 00:00:00.000 1 a 2 q 1 2003-12-01 00:00:00.000 1 a 3 w 2 2003-12-01 00:00:00.000 1 a 4 e 3 2003-12-01 00:00:00.001 2 b 2 q 4 2003-12-01 00:00:00.001 2 b 3 w 5 2003-12-01 00:00:00.001 2 b 4 e 6 2003-12-01 00:00:00.002 3 c 2 q 7 2003-12-01 00:00:00.002 3 c 3 w 8 2003-12-01 00:00:00.002 3 c 4 e Join same size sources: >>> otp.join(d1, d2, on='same_size')() Time ID A RIGHT_ID B 0 2003-12-01 00:00:00.000 1 a 2 q 1 2003-12-01 00:00:00.001 2 b 3 w 2 2003-12-01 00:00:00.002 3 c 4 e Adding prefix to the right source for all columns: >>> d2 = d2.add_prefix('right_') >>> otp.join(d1, d2, on=d1['ID'] == d2['right_ID'])() Time ID A right_ID right_B 0 2003-12-01 00:00:00.000 1 a 0 1 2003-12-01 00:00:00.001 2 b 2 q 2 2003-12-01 00:00:00.002 3 c 3 w """output_type=output_type_by_index((left,right),output_type_index)timezone_hack=Noneifre.search(r'\b_TIMEZONE\b',str(on)):# join does not support using _TIMEZONE pseudo-field in join_criteria,# replacing it with temporary fields in the branchestimezone_hack='__TIMEZONE_HACK__'left[timezone_hack]=left['_TIMEZONE']right[timezone_hack]=right['_TIMEZONE']ifstr(on)=="all":on=f'1 = 1 or {rprefix}.TIMESTAMP >= 0'_LEFT_NODE_NAME="__SRC_LEFT__"# this is internal name_RIGHT_NODE_NAME=rprefixinitial_left_use_name_for_column_prefix=left.use_name_for_column_prefix()initial_right_use_name_for_column_prefix=right.use_name_for_column_prefix()initial_left_source_node_name=left.node_name()initial_right_source_node_name=right.node_name()left.use_name_for_column_prefix(True)left.node_name(_LEFT_NODE_NAME)# we have to add _source prefix to all column operations# `on` expression is written with right, so we should modify it, we will restore it laterright.use_name_for_column_prefix(True)# add prefix to every operation with that table in onright.node_name(_RIGHT_NODE_NAME)columns_name_set=set()columns={}fields_to_skip_right_source={'TIMESTAMP'}forname,dtypeinchain(left.columns(skip_meta_fields=True).items(),right.columns(skip_meta_fields=True).items()):ifnameincolumns_name_set:columns[_RIGHT_NODE_NAME+"_"+name]=dtypefields_to_skip_right_source.add(name)else:columns[name]=dtypecolumns_name_set.add(name)ifhow=="outer":join_type="LEFT_OUTER"elifhow=="inner":join_type="INNER"else:raiseValueError("The 'how' parameter has wrong value. Only 'outer' and 'inner' are supported")iftimezone_hack:on=re.sub(r'\._TIMEZONE\b',f'.{timezone_hack}',str(on))on=re.sub(r'\b_TIMEZONE\b',f'{_LEFT_NODE_NAME}.{timezone_hack}',str(on))# ------------------# create objectsparams={"join_criteria":str(on),"join_type":join_type,"left_source":_LEFT_NODE_NAME}# return states of sources backleft.use_name_for_column_prefix(initial_left_use_name_for_column_prefix)left.node_name(initial_left_source_node_name)right.use_name_for_column_prefix(initial_right_use_name_for_column_prefix)right.node_name(initial_right_source_node_name)ifstr(on)=="same_size":result=output_type(node=otq.JoinSameSizeTs(),**columns)else:result=output_type(node=otq.Join(**params),**columns)__copy_sources_on_merge_or_join(result,(left,right),names=(_LEFT_NODE_NAME,_RIGHT_NODE_NAME),output_type_index=output_type_index)rename_fields_dict={}forlc,rcinzip_longest(left.columns(skip_meta_fields=True),right.columns(skip_meta_fields=True)):iflc:rename_fields_dict[f"{_LEFT_NODE_NAME}.{lc}"]=lcifrc:ifrcnotinfields_to_skip_right_source:rename_fields_dict[f"{_RIGHT_NODE_NAME}.{rc}"]=rcelse:rename_fields_dict[f"{_RIGHT_NODE_NAME}.{rc}"]=f"{_RIGHT_NODE_NAME}_{rc}"__rename_fields(result,rename_fields_dict)result.sink(otq.Passthrough(fields=_LEFT_NODE_NAME+".TIMESTAMP",drop_fields=True))items=[]forname,dtypeinresult.columns(skip_meta_fields=True).items():items.append(type2str(dtype)+" "+name)ifkeep_fields_not_in_schema:# Here we try to preserve fields of original sources that were not in schema# in their original form. If there's a duplication of fields or any other problem# in runtime, we'll be able to do nothingresult.sink(otq.Passthrough(fields=_RIGHT_NODE_NAME+".TIMESTAMP",drop_fields=True))result.sink(otq.RenameFieldsEp(rename_fields=rf"{_LEFT_NODE_NAME}\.(.*)=\1,{_RIGHT_NODE_NAME}\.(.*)=\1",use_regex=True))result.sink(otq.Table(fields=",".join(items),keep_input_fields=True))else:result.sink(otq.Table(fields=",".join(items)))iftimezone_hack:result=result.drop([fieldforfieldinresult.schemaiffield.endswith(timezone_hack)])returnresult
[docs]defjoin_by_time(sources,how="outer",on=None,policy=None,check_schema=True,leading=0,match_if_identical_times=None,output_type_index=None,use_rename_ep=True):""" Joins ticks from multiple input time series, based on input tick timestamps. ``leading`` source tick joined with already arrived ticks from other sources. >>> leading = otp.Ticks(A=[1, 2], offset=[1, 3]) >>> other = otp.Ticks(B=[1], offset=[2]) >>> otp.join_by_time([leading, other])() Time A B 0 2003-12-01 00:00:00.001 1 0 1 2003-12-01 00:00:00.003 2 1 In case you willing to add prefix/suffix to all columns in one of the sources you should use :func:`Source.add_prefix` or :func:`Source.add_suffix` Parameters ---------- sources: Collection[:class:`Source`] The collection of Source objects which will be joined how: 'outer' or 'inner' The method of join (inner or outer) on: Collection[:class:`Column`] ``on`` add an extra check to join - only ticks with same ``on`` fields will be joined >>> leading = otp.Ticks(A=[1, 2], offset=[1, 3]) >>> other = otp.Ticks(A=[2, 2], B=[1, 2], offset=[0, 2]) >>> otp.join_by_time([leading, other], on=['A'])() Time A B 0 2003-12-01 00:00:00.001 1 0 1 2003-12-01 00:00:00.003 2 2 policy: 'arrival_order', 'latest_ticks', 'each_for_leader_with_first' or 'each_for_leader_with_latest' Policy of joining ticks with same timestamps >>> leading = otp.Ticks(A=[1, 2], offset=[0, 0], OMDSEQ=[0, 3]) >>> other = otp.Ticks(B=[1, 2], offset=[0, 0], OMDSEQ=[2, 4]) Note: in the examples below we assume that all ticks have same timestamps, but order of ticks as in example. OMDSEQ is a special field that store order of ticks with same timestamp - ``arrival_order`` output tick generated on arrival of ``leading`` source tick >>> data = otp.join_by_time([leading, other], policy='arrival_order') >>> data()[['Time', 'A', 'B']] Time A B 0 2003-12-01 1 0 1 2003-12-01 2 1 - ``latest_ticks`` Tick generated at the time of expiration of a particular timestamp (when all ticks from all sources for current timestamp arrived). Only latest tick from ``leading`` source will be used. >>> data = otp.join_by_time([leading, other], policy='latest_ticks') >>> data()[['Time', 'A', 'B']] Time A B 0 2003-12-01 2 2 - ``each_for_leader_with_first`` Each tick from ``leading`` source will be joined with first tick from other sources for current timestamp >>> data = otp.join_by_time( ... [leading, other], ... policy='each_for_leader_with_first' ... ) >>> data()[['Time', 'A', 'B']] Time A B 0 2003-12-01 1 1 1 2003-12-01 2 1 - ``each_for_leader_with_latest`` Each tick from ``leading`` source will be joined with last tick from other sources for current timestamp >>> data = otp.join_by_time( ... [leading, other], ... policy='each_for_leader_with_latest' ... ) >>> data()[['Time', 'A', 'B']] Time A B 0 2003-12-01 1 2 1 2003-12-01 2 2 check_schema: bool If True onetick.py will check that all columns names are unambiguous and columns listed in `on` param are exists in sources schema. Which can lead to false positive error in case of some event processors were sink to Source. To avoid this set check_scheme to False. leading: int, 'all', :class:`Source`, list of int, list of :class:`Source` A list sources or their indexes. If this parameter is 'all', every source is considered to be leading. match_if_identical_times: bool A True value of this parameter causes an output tick to be formed from input ticks with identical timestamps only. If ``on`` is set to 'outer', default values of fields (``otp.nan``, 0, empty string) are propagated for sources that did not tick at a given timestamp. If this parameter is set to True, the default value of ``policy`` parameter is set to 'latest_ticks'. output_type_index: int Specifies index of source in ``sources`` from which type and properties of output will be taken. Useful when joining sources that inherited from :class:`Source`. By default output object type will be :class:`Source`. use_rename_ep: bool Use RenameFields ep or not. This ep can't be used in generic aggregation. See also -------- **JOIN_BY_TIME** OneTick event processor Examples -------- >>> d1 = otp.Ticks({'A': [1, 2, 3], 'offset': [1, 2, 3]}) >>> d2 = otp.Ticks({'B': [1, 2, 3], 'offset': [1, 2, 4]}) >>> otp.join_by_time([d1, d2])() Time A B 0 2003-12-01 00:00:00.001 1 0 1 2003-12-01 00:00:00.002 2 1 2 2003-12-01 00:00:00.003 3 2 >>> otp.join_by_time([d1, d2], leading=1)() Time A B 0 2003-12-01 00:00:00.001 1 1 1 2003-12-01 00:00:00.002 2 2 2 2003-12-01 00:00:00.004 3 3 >>> otp.join_by_time([d1, d2], leading=1, match_if_identical_times=True)() Time A B 0 2003-12-01 00:00:00.001 1 1 1 2003-12-01 00:00:00.002 2 2 2 2003-12-01 00:00:00.004 0 3 Adding prefix to right source for all columns: >>> otp.join_by_time([d1, d2.add_prefix('right_')])() Time A right_B 0 2003-12-01 00:00:00.001 1 0 1 2003-12-01 00:00:00.002 2 1 2 2003-12-01 00:00:00.003 3 2 Returns ------- :class:`Source` or same class as ``sources[output_type_index]`` A time series of ticks. """output_type=output_type_by_index(sources,output_type_index)join_str_keys=[]# if key is set, then generalize it, ie convert into list;# then remove keys from 'columns_count' dict to pass validation afterifonisnotNone:ifisinstance(on,list):passelifisinstance(on,Column):on=[on]elifisinstance(on,str):on=[on]else:raiseTypeError(f"It is not supported to have '{type(on)}' type as a key")forjoin_keyinon:dtypes=set()ifcheck_schema:forsourceinsources:try:key_type=source.schema[str(join_key)]exceptKeyErrorase:raiseKeyError(f"Column '{join_key}' not found in source schema {source}")fromedtypes.add(type2str(key_type))iflen(dtypes)>1:raiseTypeError(f"Column '{join_key}' has different types in sources: {dtypes}")ifisinstance(join_key,Column):join_str_keys.append(str(join_key))elifisinstance(join_key,str):join_str_keys.append(join_key)ifcheck_schema:_check_schema_for_join_by_time(join_str_keys,sources)ifhownotin["inner","outer"]:raiseException('Wrong value for the "how" parameter. It is allowed to use "inner" or "outer" values')join_type=how.upper()# ------------------# create objectsparams={"add_source_prefix":False,"join_type":join_type}leading=_fill_leading_sources_param(leading,params,sources)ifonisnotNone:params["join_keys"]=",".join(join_str_keys)ifpolicyisnotNone:policies={"arrival_order","latest_ticks","each_for_leader_with_first","each_for_leader_with_latest"}ifpolicy.lower()notinpolicies:raiseValueError("Invalid policy. Only the following ones are allowed: "+", ".join(policies)+".")params["same_timestamp_join_policy"]=policy.upper()ifmatch_if_identical_timesisnotNone:params["match_if_identical_times"]=match_if_identical_timescolumns={name:dtypeforsrcinsourcesforname,dtypeinsrc.columns(skip_meta_fields=True).items()}result=output_type(node=otq.JoinByTime(**params),**columns)__copy_sources_on_merge_or_join(result,sources,names=True,drop_meta=True,leading=leading,output_type_index=output_type_index,use_rename_ep=use_rename_ep)ifhow=="outer":items=[]forname,dtypeinresult.columns(skip_meta_fields=True).items():items.append(type2str(dtype)+" "+name)result.sink(otq.Table(fields=",".join(items),keep_input_fields=True))returnresult
@singledispatchdef_fill_leading_sources_param(leading,params,sources):fromonetick.py.core.sourceimport_Sourceifisinstance(leading,_Source):# TODO: PY-104 Get rid of circular dependencies in code to avoid local importresult=f"__SRC_{__find_by_id(sources,leading)}__"params["leading_sources"]=resultresult=[result]elifleading=="all":# all sources are leading which is specified by empty stringparams["leading_sources"]=""result=[]else:raiseValueError("wrong leading param was specified, please use any of int, 'all' literal, ""list of int, list of _Source")returnresult@_fill_leading_sources_param.register(int)def_(leading,params,sources):ifleading<0:leading=len(sources)+leadingif0<=leading<len(sources):result=f"__SRC_{leading}__"params["leading_sources"]=resultreturn[result]else:raiseValueError(f"leading source index should be in range(-len(source), len(source)), "f"but {leading} was specified.")@_fill_leading_sources_param.register(list)# type: ignore # _ already defined above@_fill_leading_sources_param.register(tuple)def_(leading,params,sources):iflen(leading)>len(sources):raiseValueError("Number of leading sources can't be bigger number of sources")eliflen(leading)==len(sources):Warning("You've specified leading and source lists of the same size, you can specify leading='all' ""instead of it")ifisinstance(leading[0],int):result=leadingelse:result=[__find_by_id(sources,lead)forleadinleading]indexes=",".join(f"__SRC_{i}__"foriinresult)params["leading_sources"]=indexesreturnresultdef__find_by_id(collection,item):forindex,sinenumerate(collection):ifsisitem:returnindexraiseValueError("The leading source should be in join sources list")def_check_schema_for_join_by_time(join_str_keys,sources):# check that there aren't matching columnscolumns_count=Counter()forsrcinsources:columns_count.update(src.columns(skip_meta_fields=True).keys())forjoin_keyinjoin_str_keys:delcolumns_count[join_key]matched=[kfork,valueincolumns_count.items()ifvalue>1]if"OMDSEQ"inmatched:# OMDSEQ behaves like the TIMESTAMP fieldmatched.remove("OMDSEQ")iflen(matched):raiseException(f"There are matched columns between sources: {','.join(matched)}")defapply_query(query,in_sources=None,output_pins=None,shared_state_variables_list=None,output_type_index=None,**params):fromonetick.py.sourcesimportqueryasotp_queryoutput_type=output_type_by_index(in_sources,output_type_index)output_pins=output_pinsifoutput_pinselse[]in_sources=in_sourcesifin_sourceselse{}shared_state_variables_list=shared_state_variables_listifshared_state_variables_listelse[]ifisinstance(query,str):# it seems that path is passedquery=otp_query(query,**params)elifisinstance(query,otp_query):ifparams:query.update_params(**params)# elifcolumns={}forsrcinin_sources.values():columns.update(src.columns(skip_meta_fields=True))str_params=query.str_paramsshared_state_variables=",".join(shared_state_variables_list)inputs_need_unbound_symbols={in_pin:src._is_unbound_required()forin_pin,srcinin_sources.items()}ifquery.graph_info.has_unbound_if_pinned(inputs_need_unbound_symbols):symbol=adaptiveelse:symbol=Nonenested_src=output_type(node=otq.NestedOtq(query.path,str_params,shared_state_variables=shared_state_variables),_has_output=len(output_pins)>0,_symbols=symbol,**columns,)eps=defaultdict()forin_pin,srcinin_sources.items():nested_src.source(src.node().copy_graph(eps),in_pin)nested_src.node().add_rules(src.node().copy_rules())nested_src._set_sources_dates(src)nested_src._merge_tmp_otq(src)iflen(output_pins)==0:# output_pins = ['OUT']returnnested_srciflen(output_pins)>1:result=[]forout_pininoutput_pins:res_src=nested_src.copy()res_src.node().out_pin(out_pin)# NOTE: need to comment out this noderes_src.sink(otq.Passthrough())# apply config customizationquery.config.apply(out_pin,res_src)result.append(res_src)returntuple(result)else:# TODO: move setting out_pin on the creating step of nested_src# It seems as not working now, because seems .copy() of _Source doesnt# copy out_pin reference, need to checknested_src.node().out_pin(output_pins[0])# apply config customizationquery.config.apply(output_pins[0],nested_src)returnnested_srcdefapply(query,*args,**kwargs):returnapply_query(query.path,*args,**kwargs,**query.params)
[docs]defcut(column:'Column',bins:Union[int,List[float]],labels:List[str]=None):""" Bin values into discrete intervals (mimics :pandas:`pandas.cut`). Parameters ---------- column: :py:class:`~onetick.py.Column` Column with numberic data used to build bins. bins: int or List[float] When List[float] - defines the bin edges. When int - Defines the number of equal-width bins in the range of x. labels: List[str] Labels used to name resulting bins. If not set, bins are numberic intervals like (5.0000000000, 7.5000000000]. Return ------ object that can be set to :py:class:`~onetick.py.Column` via :py:meth:`~onetick.py.Source.__setitem__` Examples -------- >>> # OTdirective: snippet-name: Source.functions.cut; >>> data = otp.Ticks({"X": [9, 8, 5, 6, 7, 0, ]}) >>> data['bin'] = otp.cut(data['X'], bins=3, labels=['a', 'b', 'c']) >>> data.to_df()[['X', 'bin']] X bin 0 9 c 1 8 c 2 5 b 3 6 b 4 7 c 5 0 a """src=column.obj_refreturn_CutBuilder(src,column,bins,labels=labels)
[docs]defqcut(column:'Column',q:Union[int,List[float]],labels:List[str]=None):""" Quantile-based discretization function (mimics :pandas:`pandas.qcut`). Parameters ---------- column: :py:class:`~onetick.py.Column` Column with numberic data used to build bins. q: int or List[float] When List[float] - array of quantiles, e.g. [0, .25, .5, .75, 1.] for quartiles. When int - Number of quantiles. 10 for deciles, 4 for quartiles, etc. labels: List[str] Labels used to name resulting bins. If not set, bins are numberic intervals like (5.0000000000, 7.5000000000]. Return ------ object that can be set to :py:class:`~onetick.py.Column` via :py:meth:`~onetick.py.Source.__setitem__` Examples -------- >>> # OTdirective: snippet-name: Source.functions.qcut; >>> data = otp.Ticks({"X": [10, 3, 5, 6, 7, 1]}) >>> data['bin'] = otp.qcut(data['X'], q=3, labels=['a', 'b', 'c']) >>> data.to_df()[['X', 'bin']] X bin 0 10 c 1 3 a 2 5 b 3 6 b 4 7 c 5 1 a """# TODO when q is a List[float] like [0, .25, .5, .75, 1.]src=column.obj_refreturn_QCutBuilder(src,column,q,labels=labels)
[docs]defcoalesce(sources,max_source_delay:float=0.0,output_type_index:int=None):""" Used to fill the gaps in one time series with the ticks from one or several other time series. This event processor considers ticks that arrive from several sources at the same time as being the same, allowing for possible delay across the sources when determining whether the ticks are the same. When the same tick arrives from several sources, it is only propagated from the source that has the highest priority among those sources. Input ticks do not necessarily have the same structure - they can have different fields. In order to distinguish time series the event processor adds the SYMBOL_NAME field. Also SOURCE field is added to each tick which lacks it to identify the source from which the tick is coming. Hence, one must avoid adding SOURCE field in event processors positioned after COALSECE. Parameters ---------- sources: list of :class:`Source` List of the sources to coalesce. Also, this list is treated as priority order. First member of the list has the highest priority when determining whether ticks are the same. max_source_delay: float The maximum time in seconds by which a tick from one input time series can arrive later than the same tick from another time series. output_type_index: int Specifies index of source in ``sources`` from which type and properties of output will be taken. Useful when merging sources that inherited from :class:`Source`. By default, output object type will be :class:`Source`. Return ------ :class:`Source` A time series of ticks. See also -------- **COALESCE** OneTick event processor Examples -------- If ticks from different sources have the same time, only the tick from source with the highest priority will be propagated. >>> data1 = otp.Ticks(A=[1, 2]) >>> data2 = otp.Ticks(A=[3, 4]) >>> data = otp.coalesce([data2, data1]) >>> otp.run(data)[['Time', 'A']] Time A 0 2003-12-01 00:00:00.000 3 1 2003-12-01 00:00:00.001 4 We can use ``max_source_delay`` parameter to expand time interval in which ticks are considered to have the "same time". >>> data1 = otp.Ticks({ ... 'A': [1, 2, 3], ... 'offset': [0, 3000, 6000], ... }) >>> data2 = otp.Ticks({ ... 'A': [4, 5, 6], ... # 4 is delayed by less than one second from 1 ... # 5 is delayed by one second from 2 ... # 6 is delayed by more than one second from 3 ... 'offset': [999, 4000, 7001], ... }) >>> data = otp.coalesce([data2, data1], max_source_delay=1) >>> otp.run(data)[['Time', 'A']] Time A 0 2003-12-01 00:00:00.999 4 1 2003-12-01 00:00:04.000 5 2 2003-12-01 00:00:06.000 3 3 2003-12-01 00:00:07.001 6 """ifnotsources:raiseValueError("Coalesce should have one or more inputs")output_type=output_type_by_index(sources,output_type_index)# change node names for sources, COALESCE ep needs themnew_node_names=[f'__COALESCE_SRC_{i}__'fori,sourceinenumerate(sources,start=1)]node=otq.Coalesce(priority_order=','.join(new_node_names),max_source_delay=max_source_delay,)columns={# these fields will be added by COALESCE ep'SYMBOL_NAME':str,'TICK_TYPE':str,}forsourceinsources:fornamein['SYMBOL_NAME','TICK_TYPE']:ifnameinsource.schema:raiseValueError(f"Field with name '{name}' is already present in the source. "'Please, rename or delete that field prior to invoking coalesce().')shared_columns=set(source.schema).intersection(columns)fornameinshared_columns:type_1,type_2=source.schema[name],columns[name]iftype_1!=type_2:raiseValueError(f"Conflicting types for field '{name}' in different sources: {type_1}, {type_2}")columns.update(source.schema)# TODO: do we need field SOURCE (especially when node names are auto-generated)?# this field will be added by COALESCE if it's not presented in sourcescolumns.setdefault('SOURCE',str)result=output_type(node,**columns)__copy_sources_on_merge_or_join(result,sources,names=new_node_names,output_type_index=output_type_index)returnresult
[docs]defcorp_actions(source,adjustment_date:Union[date,int,str,None]=None,adjustment_date_tz:str=default,fields=None,adjust_rule="PRICE",apply_split:bool=True,apply_spinoff:bool=False,apply_cash_dividend:bool=False,apply_stock_dividend:bool=False,apply_security_splice:bool=False,apply_others:str="",apply_all:bool=False,):"""Adjusts values using corporate actions information loaded into OneTick from the reference data file. To use it, location of reference database must be specified via OneTick configuration. Parameters ---------- source : onetick.py.Source Source object adjusted by corporate actions information. adjustment_date : otp.date, int, str, None, optional The date as of which the values are adjusted. `int` format is YYYYMMDD. If it is not set, the values are adjusted as of the end date in the query. All corporate actions of the types specified in the parameters that lie between the tick timestamp and the adjustment date will be applied to each tick. Notice that the adjustment date is not affected neither by _SYMBOL_PARAM._PARAM_END_TIME nor by the apply_times_daily setting. By default None adjustment_date_tz : str, optional Timezone for adjustment date, by default global `otp.config.tz` value used. Local timezone can't be used so in this case parameter is set to 'GMT'. fields : str, optional A comma-separated list of fields to be adjusted. If this parameter is not set, some default adjustments will take place if appropriately named fields exist in the tick: - If the ADJUST_RULE parameter is set to PRICE, and the PRICE field is present, it will get adjusted. If the fields ASK_PRICE or BID_PRICE are present, they will get adjusted. If fields ASK_VALUE or BID_VALUE are present, they will get adjusted - If the ADJUST_RULE parameter is set to SIZE, and the SIZE field is present, it will get adjusted. If the fields ASK_SIZE or BID_SIZE are present, they will get adjusted. If fields ASK_VALUE or BID_VALUE are present, they will get adjusted. By default None adjust_rule : str, optional When set to PRICE, adjustments are applied under the assumption that fields to be adjusted contain prices (adjustment direction is determined appropriately). When set to SIZE, adjustments are applied under the assumption that fields contain sizes (adjustment direction is opposite to that when the parameter's value is PRICE). By default "PRICE" apply_split : bool, optional If true, adjustments for splits are applied, by default `True` apply_spinoff : bool, optional If true, adjustments for spin-offs are applied, by default `False` apply_cash_dividend : bool, optional If true, adjustments for cash dividends are applied, by default `False` apply_stock_dividend : bool, optional If true, adjustments for stock dividends are applied, by default `False` apply_security_splice : bool, optional If true, adjustments for security splices are applied, by default `False` apply_others : str, optional A comma-separated list of names of custom adjustment types to apply, by default "" apply_all : bool, optional If true, applies all types of adjustments, both built-in and custom, by default `False` Returns ------- onetick.py.Source A new source object with applied adjustments. See also -------- **CORP_ACTIONS** OneTick event processor Examples -------- >>> src = otp.DataSource('NYSE_TAQ', ... tick_type='TRD', ... start=otp.dt(2022, 5, 20, 9, 30), ... end=otp.dt(2022, 5, 26, 16)) >>> df = otp.run(src, symbols='MKD', symbol_date=otp.date(2022, 5, 22)) >>> df["PRICE"][0] 0.0911 >>> src = otp.corp_actions(src, ... adjustment_date=otp.date(2022, 5, 22), ... fields="PRICE",) >>> df = otp.run(src, symbols='MKD', symbol_date=otp.date(2022, 5, 22)) >>> df["PRICE"][0] 1.36649931675 """ifnotisinstance(adjustment_date,(date,int,type(None))):adjustment_date=date(adjustment_date)ifisinstance(adjustment_date,date):adjustment_date=int(adjustment_date.to_str())ifadjustment_date_tzisdefault:adjustment_date_tz=config.tzifnotadjustment_date_tz:warnings.warn("Local timezone can't be used in parameter 'adjustment_date_tz', setting to 'GMT'.")adjustment_date_tz='GMT'source.sink(otq.CorpActions(adjustment_date=adjustment_date,adjustment_date_tz=adjustment_date_tz,fields=fields,adjust_rule=adjust_rule,apply_split=apply_split,apply_spinoff=apply_spinoff,apply_cash_dividend=apply_cash_dividend,apply_stock_dividend=apply_stock_dividend,apply_security_splice=apply_security_splice,apply_others=apply_others,apply_all=apply_all,))returnsource
defsave_sources_to_single_file(sources,file_path=None,file_suffix='',start=None,end=None,start_time_expression=None,end_time_expression=None,timezone=None,running_query_flag=None):""" Save onetick.py.Source objects to the single file. Parameters ---------- sources: dict or list dict of names -> sources or list of sources to merge into single file. If it's the list then names will be autogenerated. Source can be :class:`otp.Source` object or dictionary with these allowed parameters: { 'source': otp.Source, 'start': datetime(2022, 1, 1), # optional 'end': datetime(2022, 1, 2), # optional 'symbols': otp.Source or otp.Symbols, # optional } file_path: str, optional Path to the file where all sources will be saved. If not set, sources will be saved to temporary file and its name will be returned. file_suffix: str Only used if ``file_path`` is not set. This suffix will be added to the name of a generated query file. start: datetime, optional start time for the resulting query file end: datetime, optional end time for the resulting query file start_time_expression: str, optional start time expression for the resulting query file end_time_expression: str, optional end time expression for the resulting query file timezone: str, optional timezone for the resulting query file running_query_flag: bool, optional running query flag for the resulting query file Returns ------- If `sources` is list then returns list of full query paths (path_to_file::query_name) with autogenerated names corresponding to each source from `sources`. If `sources` is dict then the path to the query file is returned. """ifisinstance(sources,dict):names=sources.keys()sources=sources.values()query_names=Noneelse:names=repeat(None)query_names=[]tmp_otq=TmpOtq()forname,sourceinzip(names,sources):query_start=query_end=query_symbols=Noneifisinstance(source,dict):query_start=source.get('start')query_end=source.get('end')query_symbols=source.get('symbols')source=source['source']query_name=source._store_in_tmp_otq(tmp_otq,name=name,start=query_start,end=query_end,symbols=query_symbols)ifquery_namesisnotNone:query_names.append(query_name)file_path=tmp_otq.save_to_file(file_path=file_path,file_suffix=file_suffix,start=start,end=end,start_time_expression=start_time_expression,end_time_expression=end_time_expression,timezone=timezone,running_query_flag=running_query_flag,)ifquery_namesisnotNone:return[f'{file_path}::{query_name}'forquery_nameinquery_names]returnfile_path