importcopyimportfunctoolsimportinspectimportosimportreimportuuidimportwarningsfromcollectionsimportdefaultdictfromdatetimeimportdatetime,date,timeimportpandasaspdimportnumpyasnpfromtypingimportCallable,Optional,List,Any,Tuple,Union,Type,Dict,Collectionfromonetick.py.backportsimportLiteralimportonetick.queryasotqfromonetick.lib.instanceimportOneTickLibimportonetick.py.functionsimportonetick.py.sourcesfromonetickimportpyasotpfromonetick.pyimportaggregationsfromonetick.pyimporttypesasottfromonetick.pyimportutils,configurationfromonetick.py.core._internal._manually_bound_valueimport_ManuallyBoundValuefromonetick.py.core._internal._multi_symbols_sourceimport_MultiSymbolsSourcefromonetick.py.core._internal._proxy_nodeimport_ProxyNodefromonetick.py.core._internal._state_objectsimport_StateColumnfromonetick.py.core._internal._state_varsimportStateVarsfromonetick.py.core._source._symbol_param_sourceimport_SymbolParamSourcefromonetick.py.core._source.schemaimportSchemafromonetick.py.core._source.symbolimportSymbolfromonetick.py.core._source.tmp_otqimportTmpOtqfromonetick.py.core.columnimport_Column,_LagOperatorfromonetick.py.core.column_operations._methods.methodsimportis_arithmetical,is_comparefromonetick.py.core.column_operations._methods.op_typesimportare_strings,_replace_parametersfromonetick.py.core.column_operations.baseimport_Operationfromonetick.py.core.lambda_objectimport_LambdaIfElse,apply_lambda,apply_script,_EmulateObjectfromonetick.py.core.query_inspectorimportget_query_info,add_pins,get_query_parameter_listfromonetick.py.core.eval_queryimport_QueryEvalWrapper,prepare_paramsfromonetick.py.core.cut_builderimport_BaseCutBuilderfromonetick.py.coreimportdb_constantsfromonetick.py.utilsimportadaptive,adaptive_to_defaultfromonetick.py.aggregations._docsimport(docstring,copy_method,_running_doc,_all_fields_with_policy_doc,_bucket_interval_doc,_bucket_time_doc,_bucket_units_doc,_bucket_end_condition_doc,_end_condition_per_group_doc,_boundary_tick_bucket_doc,_group_by_doc,_param_doc)fromonetick.py.aggregations.functionsimport(high_tick,low_tick,high_time,low_time,first_tick,last_tick,distinct,ranking,ob_snapshot,ob_snapshot_wide,ob_snapshot_flat)definplace_operation(method):""" Decorator that adds the `inplace` parameter and logic according to this flag. inplace=True means that method modifies an object, otherwise it copies the object firstly, modifies copy and returns the copy. """@functools.wraps(method)def_inner(self,*args,inplace=False,**kwargs):kwargs['inplace']=inplaceifinplace:method(self,*args,**kwargs)else:obj=self.copy()returnmethod(obj,*args,**kwargs)return_innerdef_is_dict_required(symbols):""" Depending on symbols, determine if output of otp.run() or Source.__call__() should always be a dictionary of {symbol: dataframe} even if only one symbol is present in the results """ifisinstance(symbols,(list,tuple)):iflen(symbols)==0:returnFalseeliflen(symbols)>1:returnTrueelse:symbols=symbols[0]ifisinstance(symbols,otp.Source):returnTrueifisinstance(symbols,otq.Symbol):symbols=symbols.nameifisinstance(symbols,str)and'eval'insymbols:returnTruereturnFalse_agg_doc=_param_doc(name='aggs',annotation=Dict,str_annotation='dict of aggregations',desc=""" aggregation dict: key - output column name; value - aggregation""")
[docs]classMetaFields:""" http://solutions.pages.soltest.onetick.com/iac/onetick-server/ep_guide/EP/Pseudo-fields.htm OneTick defines several pseudo-columns that can be treated as if they were columns of every tick. These columns can be accessed directly via :py:meth:`onetick.py.Source.__getitem__` method. But in case they are used in :py:class:`~onetick.py.core.column_operations.base.Expr` they can be accessed via ``onetick.py.Source.meta_fields``. Examples -------- Accessing pseudo-fields as columns or as class properties >>> data = otp.Tick(A=1) >>> data['X'] = data['_START_TIME'] >>> data['Y'] = otp.Source.meta_fields['_TIMEZONE'] >>> data(start=otp.dt(2003, 12, 2), timezone='GMT') Time A X Y 0 2003-12-02 1 2003-12-02 GMT """def__init__(self):self.timestamp=_Column('TIMESTAMP',dtype=ott.nsectime)self.time=self.timestampself.start_time=_Column('_START_TIME',dtype=ott.nsectime)self.start=self.start_timeself.end_time=_Column('_END_TIME',dtype=ott.nsectime)self.end=self.end_timeself.timezone=_Column('_TIMEZONE',dtype=str)self.db_name=_Column('_DBNAME',dtype=str)self.symbol_name=_Column('_SYMBOL_NAME',dtype=str)self.tick_type=_Column('_TICK_TYPE',dtype=str)self.__fields=set(map(str,self.__dict__.values()))|{'Time'}def__iter__(self):yield fromself.__fieldsdef__contains__(self,item):returniteminself.__fieldsdef__len__(self):returnlen(self.__fields)
[docs]classSource:""" Base class for representing Onetick execution graph. All :ref:`onetick-py sources <sources>` are derived from this class and have access to all its methods. Examples -------- >>> data = otp.Tick(A=1) >>> isinstance(data, otp.Source) True Also this class can be used to initialize raw source with the help of ``onetick.query`` classes, but it should be done with caution as the user is required to set such properties as symbol name and tick type manually. >>> data = otp.Source(otq.TickGenerator(bucket_interval=0, fields='long A = 123').tick_type('TT')) >>> data(symbols='LOCAL::') Time A 0 2003-12-04 123 """# TODO: need to support transactions for every _source# transaction is set of calls between _source creation and call or between two calls# if transaction have the same operations, then it seems we should add only one set of operations_PROPERTIES=["__node","__hash","__use_name_for_column_prefix","__sources_keys_dates","__sources_modify_query_times","__sources_base_ep_func","__sources_symbols","__source_has_output","__name","_tmp_otq"]_OT_META_FIELDS=["_START_TIME","_END_TIME","_SYMBOL_NAME","_DBNAME","_TICK_TYPE",'_TIMEZONE']meta_fields=MetaFields()Symbol=Symboldef__init__(self,node=None,_symbols=None,_start=adaptive,_end=adaptive,_base_ep_func=None,_has_output=True,**kwargs):self._tmp_otq=TmpOtq()self.__name=Noneif"Time"inkwargs:# TODO: add testsraiseValueError("It is not allowed to have 'Time' or 'TIMESTAMP' columns, because they are the key columns")if"TIMESTAMP"notinkwargs:kwargs["TIMESTAMP"]=ott.nsectimekwargs["Time"]=ott.nsectimekwargs["_START_TIME"]=ott.nsectimekwargs["_END_TIME"]=ott.nsectimekwargs["_SYMBOL_NAME"]=strkwargs["_DBNAME"]=strkwargs["_TICK_TYPE"]=strkwargs["_TIMEZONE"]=strforkey,valueinkwargs.items():# calculate value typevalue_type=ott.get_source_base_type(value)self.__dict__[key]=_Column(name=key,dtype=value_type,obj_ref=self)# just an alias to Timestampself.__dict__['Time']=self.__dict__['TIMESTAMP']self.__dict__['_state_vars']=StateVars(self)ifnodeisNone:node=otq.Passthrough()self.__hash=uuid.uuid4()self.__sources_keys_dates={}self.__sources_modify_query_times={}self.__sources_base_ep_func={}self.__sources_symbols={}self.__source_has_output=_has_outputifisinstance(node,_ProxyNode):self.__node=_ProxyNode(*node.copy_graph(),refresh_func=self.__refresh_hash)else:self.__node=_ProxyNode(*self.__from_ep_to_proxy(node),refresh_func=self.__refresh_hash)self.__sources_keys_dates[self.__node.key()]=(_start,_end)self.__sources_modify_query_times[self.__node.key()]=Falseself.__sources_base_ep_func[self.__node.key()]=_base_ep_funcself.__sources_symbols[self.__node.key()]=_symbols# flag controls whether we have to add node_name prefix when convert# columns into stringself.__use_name_for_column_prefix=Falsedef_try_default_constructor(self,*args,node=None,**kwargs):ifnodeisnotNone:# Source.copy() method will use this way# all info from original source will be copied by copy() method afterSource.__init__(self,*args,node=node,**kwargs)returnTruereturnFalsedefbase_ep(self,**kwargs):# default implementation# real implementation should return a Source objectreturnNonedef_clean_sources_dates(self):self.__sources_keys_dates={}self.__sources_modify_query_times={}self.__sources_base_ep_func={}self.__sources_symbols={}def_set_sources_dates(self,other,copy_symbols=True):self.__sources_keys_dates.update(other._get_sources_dates())self.__sources_modify_query_times.update(other._get_sources_modify_query_times())self.__sources_base_ep_func.update(other._get_sources_base_ep_func())ifcopy_symbols:self.__sources_symbols.update(other._get_sources_symbols())else:# this branch is applicable for the bound symbols with callbacks,# where we drop all adaptive symbols and keep only manually specified# symbolsmanually_bound={key:_ManuallyBoundValue(value)forkey,valueinother._get_sources_symbols().items()ifvalueisnotadaptiveandvalueisnotadaptive_to_default}self.__sources_symbols.update(manually_bound)self.__source_has_output=other._get_source_has_output()def_change_sources_keys(self,keys:dict):""" Change keys in sources dictionaries. Need to do it, for example, after rebuilding the node history with new keys. Parameters ---------- keys: dict Mapping from old key to new key """sources=(self.__sources_keys_dates,self.__sources_modify_query_times,self.__sources_base_ep_func,self.__sources_symbols)fordictionaryinsources:forkeyinlist(dictionary):dictionary[keys[key]]=dictionary.pop(key)def_get_source_has_output(self):returnself.__source_has_outputdef_get_sources_dates(self):returnself.__sources_keys_datesdef_get_sources_modify_query_times(self):returnself.__sources_modify_query_timesdef_get_sources_base_ep_func(self):returnself.__sources_base_ep_funcdef_get_sources_symbols(self):returnself.__sources_symbolsdefuse_name_for_column_prefix(self,flag=None):ifflagisnotNone:self.__use_name_for_column_prefix=flagreturnself.__use_name_for_column_prefixdef_check_key_in_properties(self,key:str)->bool:ifkeyinself.__class__._PROPERTIES:returnTrueifkey.replace('_'+Source.__name__.lstrip('_'),"")inself.__class__._PROPERTIES:returnTrueifkey.replace(self.__class__.__name__,"")inself.__class__._PROPERTIES:returnTruereturnFalsedef__setattr__(self,key,value):ifself._check_key_in_properties(key):self.__dict__[key]=valuereturnifisinstance(value,_BaseCutBuilder):value(key)returnvalue=self.__validate_before_setting(key,value)ifkeyinself.__dict__:field=self.__dict__[key]ifissubclass(type(field),_Column):self.__update_field(field,value)else:raiseAttributeError(f'Column "{key}" not found')else:assertnot(isinstance(value,_StateColumn)andvalue.obj_refisNone),"State variables should be in `state` field"self.__add_field(key,value)def__validate_before_setting(self,key,value):ifkeyin["Symbol","_SYMBOL_NAME"]:raiseValueError("Symbol setting is supported during creation only")ifkey=="_state_vars":raiseValueError("state field is necessary for keeping state variables and can't be rewritten")ifisinstance(value,ott.ExpressionDefinedTimeOffset):value=value.nifisinstance(value,np.generic):value=value.item()ifnot(ott.is_type_supported(ott.get_object_type(value))orisinstance(value,_Operation)ortype(value)istuple):raiseTypeError(f'It is not allowed to set objects of "{type(value)}" type')returnvaluedef_update_field(self,field,value):self.__update_field(field,value)def__update_field(self,field,value):def_replace_positive_lag_operator_with_tmp_column(operation):ifisinstance(operation,_LagOperator)andoperation.index>0:column=operation._op_params[0]name=column.nameifname.startswith("__"):raiseValueError("Column name started with two underscores should be used by system only, ""please do not use such names.")name=f"__{name}_{operation.index}_NEW__"return_Column(name,column.dtype,column.obj_ref,precision=getattr(column,"_precision",None))new_names,old_names=self.__get_old_and_new_names(_replace_positive_lag_operator_with_tmp_column,value)ifold_names:self.sink(otq.AddFields(", ".join(f"{new} = {arg}"forarg,newinzip(old_names,new_names))))iftype(value)istuple:# support to be compatible with adding fields to get rid of some strange problems# but really we do not use passed type, because update field does not support itvalue,_=valueconvert_to_type=Nonestr_value=ott.value2str(value)value_dtype=ott.get_object_type(value)base_type=ott.get_base_type(value_dtype)ifbase_typeisbool:# according OneTickbase_type=floattype_changes=False# because mantis 0021194ifbase_typeisstr:# update_field non-string field to string field (of any length) or value# changes type to default stringifnotissubclass(field.dtype,str):field._dtype=strtype_changes=Trueelse:if((issubclass(field.dtype,int)orissubclass(field.dtype,float)orissubclass(field.dtype,str))and(issubclass(value_dtype,ott.msectime)orissubclass(value_dtype,ott.nsectime))and(str(field)!='TIMESTAMP')and(notisinstance(field,_StateColumn))):# in OneTick after updating fields with functions that return datetime values# the type of column will not change for long and double columns# and will change to long (or double in older versions) when updating string column# (see BDS-267)# That's why we are explicitly setting type for returned valueconvert_to_type=value_dtypeifissubclass(field.dtype,str):# using update_field only for string because update_fields preserves type# by default and raises exception if it can't be donetype_changes=Trueelifissubclass(field.dtype,float)andbase_typeisintandnotisinstance(field,_StateColumn):# PY-574 if field was float and int, then# it is still float in onetick, so no need to change type on otp levelconvert_to_type=intelif(issubclass(field.dtype,ott.msectime)orissubclass(field.dtype,ott.nsectime))andbase_typeisint:# if field was time type and we add something to it, then# no need to change typepasselse:ifissubclass(value_dtype,bool):value_dtype=floatifisinstance(field,_StateColumn):pass# do nothingelse:field._dtype=value_dtypetype_changes=True# for aliases, TIMESTAMP ~ Time as an examplekey=str(field)ifkey=="TIMESTAMP":self.__update_timestamp(key,value,str_value)eliftype_changes:if(value_dtypeisotp.nsectimeandissubclass(field.dtype,str)):# PY-416 string field changing the type from str to datetime leads to losing nanoseconds# work around is: make a new column first, delete accessor column# and then recreate it with value from temp columnself.sink(otq.AddField(field=f"_TMP_{key}",value=str_value))self.sink(otq.Passthrough(fields=key,drop_fields=True))self.sink(otq.AddField(field=f"{key}",value=f"_TMP_{key}"))self.sink(otq.Passthrough(fields=f"_TMP_{key}",drop_fields=True))else:self.sink(otq.UpdateField(field=key,value=str_value))else:self.sink(otq.UpdateFields(set=key+"="+str_value))ifnew_names:self.sink(otq.Passthrough(drop_fields=True,fields=",".join(new_names)))ifconvert_to_type:# manual type conversion after update fields for some casesself.table(**{key:convert_to_type},inplace=True,strict=False)def__update_timestamp(self,key,value,str_value):ifnothasattr(value,"dtype"):# A constant value: no need to pre- or post-sortself.sink(otq.UpdateField(field=key,value=str_value))elif(isinstance(value,_Column)andnotisinstance(value,_StateColumn)andhasattr(value,"name")andvalue.nameinself.__dict__):# An existing, present column: no need to create a temporary one. See PY-253need_to_sort=str_valuenotin("_START_TIME","_END_TIME")ifneed_to_sort:self.sort(value,inplace=True)self.sink(otq.UpdateField(field=key,value=str_value))ifneed_to_sort:self.sort(self.Time,inplace=True)elif(notis_compare(value)andisinstance(value,(_Operation,_LambdaIfElse))oris_arithmetical(value)):# An expression or a statevar: create a temp column with its value, pre- and post- sort.self.sink(otq.AddField(field="__TEMP_TIMESTAMP__",value=str_value))self.sink(otq.OrderByEp(order_by="__TEMP_TIMESTAMP__ ASC"))self.sink(otq.UpdateField(field=key,value="__TEMP_TIMESTAMP__"))self.sink(otq.Passthrough(fields="__TEMP_TIMESTAMP__",drop_fields=True))self.sort(self.Time,inplace=True)else:raiseException(f"Illegal type for timestamp assignment: {value.__class__}")def__get_old_and_new_names(self,_replace_positive_lag_operator_with_tmp_column,value):old_args,new_args=_replace_parameters(value,_replace_positive_lag_operator_with_tmp_column)old_args={str(old):(old,new)forold,newinzip(old_args,new_args)}old_names=[]new_names=[]forop_str,(_,new)inold_args.items():old_names.append(op_str)new_names.append(str(new))returnnew_names,old_namesdefappend(self,other)->'Source':""" Merge data source with `other` Parameters ---------- other: List, Source data source to merge Returns ------- Source """ifisinstance(other,list):returnonetick.py.functions.merge(other+[self])else:returnonetick.py.functions.merge([self,other])def__add_field(self,key,value):# -------------# ADD_FIELDS# -------------# TODO: merge together several add fields in one add_fieldsiftype(value)istuple:value,dtype=valueelse:dtype=ott.get_object_type(value)iftype(value)isstr:iflen(value)>ott.string.DEFAULT_LENGTH:dtype=ott.string[len(value)]ifissubclass(dtype,bool):# according to OneTick transformationsdtype=floatifissubclass(dtype,(ott.datetime,ott.date)):# according to OneTick transformationsdtype=ott.nsectime# TODO: shouldn't all such logic be in ott.type2str?ifnp.issubdtype(dtype,np.integer):dtype=intifnp.issubdtype(dtype,np.floating):dtype=floattype_str=ott.type2str(dtype)# format valuestr_value=ott.value2str(value)self.sink(otq.AddField(field=f'{type_str}{key}',value=str_value))self.__dict__[key]=_Column(key,dtype,self)
[docs]def__getitem__(self,item):""" Allows to express multiple things: - access a field by name - filter ticks by condition - select subset of fields - set order of fields Parameters ---------- item: str, :class:`Operation`, :func:`eval`, List[str] - ``str`` is to access column. - ``Operation`` to express filter condition. - ``otp.eval`` to express filter condition based on external query - ``List[str]`` select subset of specified columns - ``slice[List[str]::]`` set order of columns - ``slice[Tuple[str, Type]::]`` type defaulting - ``slice[:]`` alias to :meth:`Source.copy()` Returns ------- Column, Source or tuple of Sources - Column if column name was specified. - Two sources if filtering expression or eval was provided: the first one is for ticks that pass condition and the second one that do not. Examples -------- Access to the `X` column: add `Y` based on `X` >>> data = otp.Ticks(X=[1, 2, 3]) >>> data['Y'] = data['X'] * 2 >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.000 1 2 1 2003-12-01 00:00:00.001 2 4 2 2003-12-01 00:00:00.002 3 6 Filtering based on expression >>> data = otp.Ticks(X=[1, 2, 3]) >>> data_more, data_less = data[(data['X'] > 2)] >>> otp.run(data_more) Time X 0 2003-12-01 00:00:00.002 3 >>> otp.run(data_less) Time X 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 2 Filtering based on the result of another query. Another query should have only one tick as a result with only one field (whatever it names). >>> exp_to_select = otp.Ticks(WHERE=['X > 2']) >>> data = otp.Ticks(X=[1, 2, 3], Y=['a', 'b', 'c'], Z=[.4, .3, .1]) >>> data, _ = data[otp.eval(exp_to_select)] >>> otp.run(data) Time X Y Z 0 2003-12-01 00:00:00.002 3 c 0.1 Select subset of specified columns >>> data = otp.Ticks(X=[1, 2, 3], Y=['a', 'b', 'c'], Z=[.4, .3, .1]) >>> data = data[['X', 'Z']] >>> otp.run(data) Time X Z 0 2003-12-01 00:00:00.000 1 0.4 1 2003-12-01 00:00:00.001 2 0.3 2 2003-12-01 00:00:00.002 3 0.1 Slice with list will keep all columns, but change order: >>> data=otp.Tick(Y=1, X=2, Z=3) >>> otp.run(data) Time Y X Z 0 2003-12-01 1 2 3 >>> data = data[['X', 'Y']:] >>> otp.run(data) Time X Y Z 0 2003-12-01 2 1 3 Slice can be used as short-cut for :meth:`Source.copy`: >>> data[:] # doctest: +ELLIPSIS <onetick.py.sources.Tick object at ...> See Also -------- | :meth:`Source.table`: another and more generic way to select subset of specified columns | **PASSTHROUGH** OneTick event processor | **WHERE_CLAUSE** OneTick event processor """strict=Trueifisinstance(item,(_Operation,_QueryEvalWrapper)):ifisinstance(item,_Operation):item=item._make_python_way_bool_expression()where=self.copy(ep=otq.WhereClause(where=str(item)))if_source=where.copy()if_source.node().out_pin("IF")else_source=where.copy()else_source.node().out_pin("ELSE")# TODO: add ability to remove then this ep, because it is required only for right outputelse_source.sink(otq.Passthrough())returnif_source,else_sourceifisinstance(item,slice):ifitem.step:raiseAttributeError("Source columns slice with step set makes no sense")ifitem.startanditem.stop:raiseAttributeError("Source columns slice with both start and stop set is not available now")ifnotitem.startanditem.stop:raiseAttributeError("Source columns slice with only stop set is not implemented yet")ifitem.startisNoneanditem.stopisNone:returnself.copy()item=item.startstrict=Falseifisinstance(item,tuple):item=dict([item])elifisinstance(item,list):ifnotitem:returnself.copy()item_type=list(set([type(x)forxinitem]))iflen(item_type)>1:raiseAttributeError(f"Different types {item_type} in slice list is not supported")ifitem_type[0]==tuple:item=dict(item)ifisinstance(item,list):# ---------# TABLE# ---------items=[]foritinitem:ifisinstance(it,_Column):items.append(it.name)elifisinstance(it,str):items.append(it)else:raiseValueError(f"It is not supported to filter '{it}' object of '{type(it)}' type")# validationforiteminitems:ifitemnotinself.schema:existing_columns=", ".join(self.schema.keys())raiseAttributeError(f"There is no '{item}' column. There are existing columns: {existing_columns}")columns={}dtypes=self.columns(skip_meta_fields=True)forcolumn_nameinitems:ifcolumn_namenotin['Time','Timestamp','TIMESTAMP']:columns[column_name]=dtypes[column_name]returnself.table(strict=strict,**columns)ifisinstance(item,dict):returnself.table(strict=strict,**item)name,dtype="",None# way to set typeifisinstance(item,tuple):name,dtype=itemelse:name=itemifnamenotinself.__dict__:ifdtypeisNone:raiseKeyError(f'Column name {name} is not in the schema. Please, check that this column ''is in the schema or add it using the .schema property')ifname==0orname==1:raiseValueError(f"constant {name} are not supported for indexing for now, please use otp.Empty")iftype(name)in(int,float):raiseValueError("integer indexes are not supported")self.__dict__[name]=_Column(name,dtype,self)else:ifnotisinstance(self.__dict__[name],_Column):raiseAttributeError(f"There is no '{name}' column")ifdtype:type1,type2=self.__dict__[name].dtype,dtypeb_type1,b_type2=ott.get_base_type(type1),ott.get_base_type(type2)ifb_type1!=b_type2:if{type1,type2}=={int,float}:self.__dict__[name]._dtype=floatelse:raiseWarning(f"Column '{name}' was declared as '{type1}', but you want to change it to '{type2}', ""that is not possible without setting type directly via assigning value")else:ifissubclass(b_type1,str):t1_length=ott.string.DEFAULT_LENGTHiftype1isstrelsetype1.lengtht2_length=ott.string.DEFAULT_LENGTHiftype2isstrelsetype2.lengthself.__dict__[name]._dtype=type2ift1_length<t2_lengthelsetype1if{type1,type2}=={ott.nsectime,ott.msectime}:self.__dict__[name]._dtype=ott.nsectimereturnself.__dict__[name]
[docs]def__setitem__(self,key,value):""" Add new column to the source or update existing one. Parameters ---------- key: str The name of the new or existing column. value: int, str, float, datetime, date, \ :py:class:`~onetick.py.Column`, :py:class:`~onetick.py.Operation`, :py:class:`~onetick.py.string`, \ :py:class:`otp.date <onetick.py.date>`, :py:class:`otp.datetime <onetick.py.datetime>`, \ :py:class:`~onetick.py.nsectime`, :py:class:`~onetick.py.msectime` The new value of the column. See also -------- | **ADD_FIELD** OneTick event processor | **UPDATE_FIELD** OneTick event processor Examples -------- >>> data = otp.Tick(A='A') >>> data['D'] = otp.datetime(2022, 2, 2) >>> data['X'] = 1 >>> data['Y'] = data['X'] >>> data['X'] = 12345 >>> data['Z'] = data['Y'].astype(str) + 'abc' >>> otp.run(data) Time A D X Y Z 0 2003-12-01 A 2022-02-02 12345 1 1abc """returnself.__setattr__(key,value)
[docs]@inplace_operationdefupdate(self,if_set,else_set=None,where=1,inplace=False)->'Source':""" Update field of the Source Parameters ---------- if_set: dict Dictionary <field name>: <expression>. else_set: dict, optional Dictionary <field name>: <expression> where: expression, optional Condition of updating. If ``where`` is True the fields from ``if_set`` will be updated with corresponding expression. If ``where`` is False, the fields from ``else_set`` will be updated with corresponding expression. inplace: bool A flag controls whether operation should be applied inplace. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. Returns ------- :class:`Source` or ``None``. See also -------- **UPDATE_FIELD** and **UPDATE_FIELDS** OneTick event processors Examples -------- >>> # OTdirective: snippet-name: Arrange.conditional update; >>> t = otp.Ticks({'X': [1, 2, 3], ... 'Y': [4, 5, 6], ... 'Z': [1, 0, 1]}) >>> t = t.update(if_set={'X': t['X'] + t['Y']}, ... else_set={'X': t['X'] - t['Y']}, ... where=t['Z'] == 1) >>> otp.run(t) # OTdirective: snippet-example; Time X Y Z 0 2003-12-01 00:00:00.000 5 4 1 1 2003-12-01 00:00:00.001 -3 5 0 2 2003-12-01 00:00:00.002 9 6 1 """ifelse_setisNone:else_set={}iflen(if_set)==0ornotisinstance(if_set,dict):raiseValueError(f"'if_set' parameter should be non empty dict, but got '{if_set}' of type '{type(if_set)}'")def_prepare(to_prepare):result={}forin_obj,out_objinto_prepare.items():ifisinstance(in_obj,_Column):result[in_obj.name.strip()]=out_objelifisinstance(in_obj,str):result[in_obj.strip()]=out_objelse:raiseAttributeError(f"It is not supported to update item '{in_obj}' of type '{type(in_obj)}'")returnresultdef_validate(to_validate):forin_key,out_objinto_validate.items():ifnot(in_keyinself.__dict__andisinstance(self.__dict__[in_key],_Column)):raiseAttributeError(f"There is no '{in_key}' column to update")ifin_key=="Time"orin_key=="TIMESTAMP":raiseValueError("It is not allowed to modify 'Time' column using .update method")dtype=ott.get_object_type(out_obj)ifnot(issubclass(dtype,int)orissubclass(dtype,float)orissubclass(dtype,int)orissubclass(dtype,str)):raiseTypeError(f"Type '{dtype}' is not supported for setting '{in_key}' property")ifisinstance(out_obj,bool):to_validate[in_key]=int(out_obj)returnto_validate# prepare and validateitems=_validate(_prepare(if_set))else_items=_validate(_prepare(else_set))ifisinstance(where,bool):where=int(where)ifnot(getattr(where,"dtype",None)isboolorisinstance(where,int)):raiseValueError(f"Where has not supported type '{type(where)}'")# applyset_rules=[f"{self[key]}=({ott.value2str(value)})"forkey,valueinitems.items()]else_set_rules=[f"{self[key]}=({ott.value2str(value)})"forkey,valueinelse_items.items()]self.sink(otq.UpdateFields(set=",".join(set_rules),else_set=",".join(else_set_rules),where=str(where)))returnself
[docs]@docstring(parameters=[_agg_doc,_running_doc,_all_fields_with_policy_doc,_bucket_interval_doc,_bucket_time_doc,_bucket_units_doc,_bucket_end_condition_doc,_end_condition_per_group_doc,_boundary_tick_bucket_doc,_group_by_doc],add_self=True)defagg(self,aggs,*args,**kwargs)->'Source':""" Applies composition of :ref:`otp.agg <aggregations_funcs>` aggregations See Also -------- | :ref:`Aggregations <aggregations_funcs>` | **COMPUTE** OneTick event processor Examples -------- By default the whole data is aggregated: >>> data = otp.Ticks(X=[1, 2, 3, 4]) >>> data = data.agg({'X_SUM': otp.agg.sum('X')}) >>> otp.run(data) Time X_SUM 0 2003-12-04 10 Multiple aggregations can be applied at the same time: >>> data = otp.Ticks(X=[1, 2, 3, 4]) >>> data = data.agg({'X_SUM': otp.agg.sum('X'), ... 'X_MEAN': otp.agg.average('X')}) >>> otp.run(data) Time X_SUM X_MEAN 0 2003-12-04 10 2.5 Aggregation can be used in running mode: >>> data = otp.Ticks(X=[1, 2, 3, 4]) >>> data = data.agg({'CUM_SUM': otp.agg.sum('X')}, running=True) >>> otp.run(data) Time CUM_SUM 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 3 2 2003-12-01 00:00:00.002 6 3 2003-12-01 00:00:00.003 10 Aggregation can be split in buckets: >>> data = otp.Ticks(X=[1, 2, 3, 4]) >>> data = data.agg({'X_SUM': otp.agg.sum('X')}, bucket_interval=2, bucket_units='ticks') >>> otp.run(data) Time X_SUM 0 2003-12-01 00:00:00.001 3 1 2003-12-01 00:00:00.003 7 Running aggregation can be used with buckets too: >>> data = otp.Ticks(X=[1, 2, 3, 4], offsets=[0, 1000, 1500, 3000]) >>> data = data.agg(dict(X_MEAN=otp.agg.average("X"), ... X_STD=otp.agg.stddev("X")), ... running=True, bucket_interval=2) >>> otp.run(data) Time X_MEAN X_STD 0 2003-12-01 00:00:00.000 1.0 0.000000 1 2003-12-01 00:00:00.001 1.5 0.500000 2 2003-12-01 00:00:00.002 2.0 0.816497 3 2003-12-01 00:00:00.003 2.5 1.118034 4 2003-12-01 00:00:02.000 3.0 0.816497 5 2003-12-01 00:00:02.001 3.5 0.500000 6 2003-12-01 00:00:02.002 4.0 0.000000 7 2003-12-01 00:00:02.003 NaN NaN """aggs=aggs.copy()result=self.copy()what_to_aggregate=aggregations.compute(*args,**kwargs)forname,aginaggs.items():what_to_aggregate.add(name,ag)result=what_to_aggregate.apply(result)result._add_table()returnresult
defsort_values(self,*args,**kwargs):""" alias of sort See Also -------- :meth:`Source.sort` """returnself.sort(*args,**kwargs)
[docs]@inplace_operationdefsort(self,by:Union[str,Collection[Union[str,'onetick.py.Column']]],ascending=True,inplace=False)->Optional['Source']:""" Sort ticks by columns. Parameters ---------- by: str, Column or list of them Column(s) to sort by. It is possible to pass a list of column, where is the order is important: from the left to the right. ascending: bool or list Order to sort by. If list of columns is specified, then list of ascending values per column is expected. (the :class:`nan` is the smallest for ``float`` type fields) inplace: bool A flag controls whether operation should be applied inplace. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object Returns ------- :class:`Source` See also -------- **ORDER_BY** OneTick event processor Examples -------- Single column examples >>> data = otp.Ticks({'X':[ 94, 5, 34], ... 'Y':[otp.nan, 3.1, -0.3]}) >>> data = data.sort(data['X']) >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.001 5 3.1 1 2003-12-01 00:00:00.002 34 -0.3 2 2003-12-01 00:00:00.000 94 NaN >>> data = otp.Ticks({'X':[ 94, 5, 34], ... 'Y':[otp.nan, 3.1, -0.3]}) >>> data = data.sort(data['Y']) >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.000 94 NaN 1 2003-12-01 00:00:00.002 34 -0.3 2 2003-12-01 00:00:00.001 5 3.1 Inplace >>> data = otp.Ticks({'X':[ 94, 5, 34], ... 'Y':[otp.nan, 3.1, -0.3]}) >>> data.sort(data['Y'], inplace=True) # OTdirective: snippet-name: Arrange.sort.inplace; >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.000 94 NaN 1 2003-12-01 00:00:00.002 34 -0.3 2 2003-12-01 00:00:00.001 5 3.1 Multiple columns >>> data = otp.Ticks({'X':[ 5, 6, 3, 6], ... 'Y':[1.4, 3.1, 9.1, 5.5]}) >>> data = data.sort([data['X'], data['Y']]) >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.002 3 9.1 1 2003-12-01 00:00:00.000 5 1.4 2 2003-12-01 00:00:00.001 6 3.1 3 2003-12-01 00:00:00.003 6 5.5 Ascending/descending control >>> data = otp.Ticks({'X':[ 94, 5, 34], ... 'Y':[otp.nan, 3.1, -0.3]}) >>> data = data.sort(data['X'], ascending=False) >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.000 94 NaN 1 2003-12-01 00:00:00.002 34 -0.3 2 2003-12-01 00:00:00.001 5 3.1 >>> # OTdirective: snippet-name: Arrange.sort.sort; >>> data = otp.Ticks({'X':[ 5, 6, 3, 6], ... 'Y':[1.4, 3.1, 9.1, 5.5]}) >>> data = data.sort([data['X'], data['Y']], ascending=[True, False]) >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.002 3 9.1 1 2003-12-01 00:00:00.000 5 1.4 2 2003-12-01 00:00:00.003 6 5.5 3 2003-12-01 00:00:00.001 6 3.1 """columns=byifisinstance(columns,list):objs=columnselse:objs=[columns]ifisinstance(ascending,list):asc_objs=ascendingelse:asc_objs=[ascending]items=[]# -------------------------------# Columns processing# -------------------------------# convert to strings# TODO: it seems as a common code, need to move to a separate functionforobjinobjs:ifisinstance(obj,_Column):items.append(obj.name)elifisinstance(obj,str):items.append(obj)else:# TODO: cover with testsraiseTypeError(f"It is not supported to order by '{obj}' of type '{type(obj)}'")# validateforiteminitems:ifiteminself.__dict__:ifnotisinstance(self.__dict__[item],_Column):# TODO: cover with testsraiseAttributeError(f"There is no '{item}' column")# ifelse:# TODO: covert with testsraiseAttributeError(f"There is no '{item}' column")# -------------------------------# Asc processing# -------------------------------asc_items=[True]*len(items)defasc_convert(v):return"ASC"ifvelse"DESC"forinxinrange(len(items)):ifinx>=len(asc_objs):asc_obj=asc_items[inx]else:asc_obj=asc_objs[inx]ifisinstance(asc_obj,bool):asc_items[inx]=asc_convert(asc_obj)elifisinstance(asc_obj,int):asc_items[inx]=asc_convert(asc_obj)else:raiseTypeError(f"asc can not be '{asc_obj}' of type '{type(asc_obj)}'")# ---------------# combine togetherorder_by=[column_name+" "+ascforcolumn_name,ascinzip(items,asc_items)]self.sink(otq.OrderByEp(order_by=",".join(order_by)))returnself
def_hash(self):returnself.__hashdef_merge_tmp_otq(self,source):self._tmp_otq.merge(source._tmp_otq)def__prepare_graph(self,symbols=None,start=None,end=None,has_output=False):# We copy object here, because we will change it according to passed# symbols and date ranges. For example, we can add modify_query_times EP# if it is neccessaryobj=self.copy()ifhas_output:obj.sink(otq.Passthrough())start,end,symbols=obj._get_date_range(symbols,start,end)ifstartisadaptive:start=Noneifendisadaptive:end=NoneifsymbolsisnotNoneandisinstance(symbols,pd.DataFrame):symbols=utils.get_symbol_list_from_df(symbols)ifsymbolsisnotNoneandnotisinstance(symbols,list):symbols=[symbols]elifsymbolsisNone:symbols=[]_symbols=[]forsyminsymbols:_symbols.append(self._convert_symbol_to_string(sym,tmp_otq=obj._tmp_otq,start=start,end=end))returnobj,start,end,_symbols
[docs]defto_otq(self,file_name=None,file_suffix=None,query_name=None,symbols=None,start=None,end=None,timezone=None,raw=None,add_passthrough=True):""" Save data source to .otq file and return path to the saved file. Parameters ---------- file_name: str Absolute or relative path to the saved file. If ``None``, create temporary file and name it randomly. file_suffix: str Suffix to add to the saved file name (including extension). Can be specified if ``file_name`` is ``None`` to distinguish between different temporary files. Default: ".to_otq.otq" query_name: str Name of the main query in the created file. If ``None``, take name from this Source object. If that name is empty, set name to "query". symbols: str, list, :pandas:`DataFrame <pandas.DataFrame>`, :class:`Source` symbols to save query with start: datetime start time to save query with end: datetime end time to save query with timezone: str timezone to save query with raw .. deprecated:: 1.4.17 add_passthrough: bool will add Passthrough ep at the end of resulting graph Returns ------- result: str Relative (if ``file_name`` is relative) or absolute path to the created query in the format ``file_name::query_name`` """ifrawisnotNone:warnings.warn('The "raw" flag is deprecated and makes no effect',DeprecationWarning)iftimezoneisNone:timezone=configuration.config.tzfile_path=str(file_name)iffile_nameisnotNoneelseNoneiffile_suffixisNone:file_suffix=self._name_suffix('to_otq.otq')ifquery_nameisNone:query_name=self.get_name(remove_invalid_symbols=True)ifquery_nameisNone:query_name='query'obj,start,end,symbols=self.__prepare_graph(symbols,start,end)graph=obj._to_graph(add_passthrough=add_passthrough)graph.set_symbols(symbols)returnobj._tmp_otq.save_to_file(query=graph,query_name=query_name,file_path=file_path,file_suffix=file_suffix,start=start,end=end,timezone=timezone)
def_store_in_tmp_otq(self,tmp_otq,operation_suffix="tmp_query",symbols=None,start=None,end=None,raw=None,add_passthrough=True,name=None):""" Adds this source to the tmp_otq storage Parameters: tmp_otq: TmpOtq Storage object operation_suffix: str Suffix string to be added to the autogenerated graph name in the otq file name: str, optional If specified, this ``name`` will be used to save query and ``suffix`` parameter will be ignored. Returns: result: str String with the name of the saved graph (starting with THIS::) """# We copy object here, because we will change it according to passed# symbols and date ranges. For example, we can add modify_query_times EP# if it is neccessaryifrawisnotNone:warnings.warn('The "raw" flag is deprecated and makes no effect',DeprecationWarning)obj=self.copy()start,end,symbols=obj._get_date_range(symbols,start,end)tmp_otq.merge(obj._tmp_otq)ifsymbolsandnotisinstance(symbols,list):symbols=[symbols]elifsymbolsisNone:symbols=[]_symbols=[]forsyminsymbols:_symbols.append(self._convert_symbol_to_string(sym,tmp_otq))ifisinstance(start,ott.dt):# OT save_to_file checks for the datetime timestart=datetime.fromtimestamp(start.timestamp())ifisinstance(end,ott.dt):end=datetime.fromtimestamp(end.timestamp())graph=obj._to_graph(add_passthrough=add_passthrough)graph.set_start_time(start)graph.set_end_time(end)graph.set_symbols(_symbols)suffix=self._name_suffix(suffix=operation_suffix,separator='__',remove_invalid_symbols=True)returntmp_otq.add_query(graph,suffix=suffix,name=name)def_save_as_tmp_otq(self,start=None,end=None,symbols=None,suffix=""):tmp_otq=utils.TmpFile(f"{suffix}.otq")query_path=self.to_otq(tmp_otq,symbols=symbols,start=start,end=end)returnquery_path
[docs]defplot(self,y,x='Time',kind='line',**kwargs):""" Executes the query with known properties and builds a plot resulting dataframe. Uses the pandas dataframe plot method to plot data. Other parameters could be specified through the ``kwargs``. Parameters ---------- x: str Column name for the X axis y: str Column name for the Y axis kind: str The kind of plot Examples -------- >>> data = otp.Ticks(X=[1, 2, 3]) >>> data.plot(y='X', kind='bar') # doctest: +SKIP """result=self.copy()returnresult[[y,x]]().plot(x=x,y=y,kind=kind,**kwargs)
[docs]defcount(self,**kwargs):""" Returns the number of ticks in the query. Adds an aggregation that calculate total ticks count, and executes a query. Result is a single value -- number of ticks. Possible application is the jupyter when a developer wants to check data presences for example. Parameters ---------- kwargs parameters that will be passed to :py:func:`otp.run <onetick.py.run>` Returns ------- int See Also -------- :py:func:`otp.run <onetick.py.run>` Source.head, Source.tail, :py:func:`onetick.py.agg.count` Examples -------- >>> data = otp.Ticks(X=[1, 2, 3]) >>> data.count() 3 >>> data = otp.Empty() >>> data.count() 0 """result=self.copy()result=result.agg({'__num_rows':otp.agg.count()})(**kwargs)ifresult.empty:return0returnint(result['__num_rows'][0])
[docs]defhead(self,n=5,**kwargs)->'Union[pd.DataFrame, Source]':""" Executes the query and returns first ``n`` ticks as a pandas dataframe. It is useful in the jupyter case when you want to observe first ``n`` values. Parameters ---------- n: int, default=5 number of ticks to return kwargs: parameters will be passed to :py:func:`otp.run <onetick.py.run>` Returns ------- :pandas:`DataFrame <pandas.DataFrame>` See Also -------- :py:func:`otp.run <onetick.py.run>` Source.tail, Source.count Examples -------- >>> data = otp.Ticks(X=list('abcdefgik')) >>> data.head()[['X']] X 0 a 1 b 2 c 3 d 4 e """result=self.copy()result=result.first(n=n)# pylint: disable=E1123returnresult(**kwargs)
[docs]deftail(self,n=5,**kwargs)->Union['pd.DataFrame','Source']:""" Executes the query and returns last ``n`` ticks as a pandas dataframe. It is useful in the jupyter case when you want to observe last ``n`` values. Parameters ---------- n: int number of ticks to return kwargs: parameters will be passed to :py:func:`otp.run <onetick.py.run>` Returns ------- :pandas:`DataFrame <pandas.DataFrame>` See Also -------- :py:func:`otp.run <onetick.py.run>` :meth:`Source.head`, :meth:`Source.count` Examples -------- >>> data = otp.Ticks(X=list('abcdefgik')) >>> data.tail()[['X']] X 0 e 1 f 2 g 3 i 4 k """result=self.copy()result=result.last(n=n)# pylint: disable=E1123returnresult(**kwargs)
def_rename_impl_(self,columns=None,**kwargs):iflen(columns)!=0andlen(kwargs)!=0:raiseException("It is not allowed to rename using 'columns' parameter and **kwargs simultaniously. ""Use either 'columns' parameter or **kwargs")iflen(columns)==0andlen(kwargs)!=0:columns=kwargs# prepareitems={}out_names=set()forin_obj,out_objincolumns.items():ifisinstance(in_obj,_Column):items[in_obj.name.strip()]=out_obj.strip()elifisinstance(in_obj,str):items[in_obj.strip()]=out_obj.strip()else:raiseException(f"It is not supported to rename item '{in_obj}' of type {type(in_obj)}'")ifout_objinout_names:raiseAttributeError(f"You want to rename '{in_obj}' into '{out_obj}', "f"but also want to rename another column into '{out_obj}'")out_names.add(out_obj)# validateforin_key,out_keyinitems.items():if" "inout_key:raiseAttributeError(f"There is space in '{out_key}' column name")ifin_keyinself.__dict__andisinstance(self.__dict__[in_key],_Column):passelse:raiseAttributeError(f"There is no '{in_key}' column to rename")ifout_keyinself.__dict__andisinstance(self.__dict__[out_key],_Column):raiseAttributeError(f"Column '{out_key}' is already exist")# applyforin_key,out_keyinitems.items():self.__dict__[in_key].rename(out_key,update_parent_object=False)self.__dict__[out_key]=self.__dict__[in_key]delself.__dict__[in_key]rename_rules=[key+"="+valueforkey,valueinitems.items()]self.sink(otq.RenameFieldsEp(rename_fields=",".join(rename_rules)))
[docs]@inplace_operationdefrename(self,columns=None,inplace=False)->Optional['Source']:""" Rename columns Parameters ---------- columns : dict Rules how to rename in the following format: {<column> : <new-column-name>}, where <column> is either existing column name of str type or reference to a column, and <new-column-name> a new column name of str type. inplace : bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. Returns ------- :class:`Source` or ``None`` See also -------- **RENAME** OneTick event processor Examples -------- >>> data = otp.Ticks(X=[1], Y=[2]) >>> data = data.rename({'X': 'XX', ... data['Y']: 'YY'}) >>> otp.run(data) Time XX YY 0 2003-12-01 1 2 """ifcolumnsisNone:columns={}self._rename_impl_(columns)returnself
[docs]@inplace_operationdefexecute(self,*operations,inplace=False)->Optional['Source']:""" Execute operations without returning their values. Some operations in onetick.py can be used to modify the state of some object (tick sequences mostly) and in that case user may not want to save the result of the operation to column. Parameters ---------- operations : list of :py:class:`~onetick.py.Operation` operations to execute. inplace : bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. Returns ------- :class:`Source` or ``None`` See Also -------- **EXECUTE_EXPRESSIONS** OneTick event processor Examples -------- >>> data = otp.Tick(A=1) >>> data.state_vars['SET'] = otp.state.tick_set('oldest', 'A') >>> data = data.execute(data.state_vars['SET'].erase(A=1)) """ifnotoperations:raiseValueError('At least one operation must be specified in execute() method')op_str=';'.join(map(str,operations))self.sink(otq.ExecuteExpressions(op_str))returnself
def__refresh_hash(self):""" This internal function refreshes hash for every graph modification. It is used only in _ProxyNode, because it tracks nodes changes """self.__hash=uuid.uuid3(uuid.NAMESPACE_DNS,str(self.__hash))def_prepare_for_execution(self,symbols=None,start=None,end=None,start_time_expression=None,end_time_expression=None,timezone=None,has_output=None,running_query_flag=None,require_dict=False,node_name=None):ifhas_outputisNone:has_output=self.__source_has_outputiftimezoneisNone:timezone=configuration.config.tzobj,start,end,symbols=self.__prepare_graph(symbols,start,end,has_output)require_dict=require_dictor_is_dict_required(symbols)ifnode_nameisNone:node_name='SOURCE_CALL_MAIN_OUT_NODE'obj.node().node_name(node_name)graph=obj._to_graph(add_passthrough=False)graph.set_symbols(symbols)tmp_file=utils.TmpFile(suffix=self._name_suffix('run.otq'))query_to_run=obj._tmp_otq.save_to_file(query=graph,query_name=self.get_name(remove_invalid_symbols=True)ifself.get_name(remove_invalid_symbols=True)else"main_query",file_path=tmp_file.path,start=start,end=end,running_query_flag=running_query_flag,start_time_expression=start_time_expression,end_time_expression=end_time_expression,timezone=timezone)# symbols and start/end times should be already stored in the query and should not be passed againreturndict(query=query_to_run,symbols=None,start=None,end=None,start_time_expression=None,end_time_expression=None,require_dict=require_dict,node_name=node_name,time_as_nsec=True,)
[docs]def__call__(self,*args,**kwargs):""" .. deprecated:: 1.48.3 Use :py:func:`otp.run <onetick.py.run>` instead. """warnings.warn('__call__() method is deprecated, use otp.run() instead',DeprecationWarning,stacklevel=2)returnotp.run(self,*args,**kwargs)
[docs]defto_df(self,symbols=None,**kwargs):""" .. deprecated:: 1.48.3 Use :py:func:`otp.run <onetick.py.run>` instead. """warnings.warn('to_df() method is deprecated, use otp.run() instead',DeprecationWarning,stacklevel=2)# For backward compatibility: otp.run() does not accept "symbols" as a non-keyword argumentifsymbolsisnotNone:kwargs['symbols']=symbolsreturnotp.run(self,**kwargs)
to_dataframe=to_dfdefprint_api_graph(self):self.node().copy_graph(print_out=True)def_add_table(self,strict=False):table=otq.Table(fields=",".join(ott.type2str(dtype)+" "+nameforname,dtypeinself.columns(skip_meta_fields=True).items()),keep_input_fields=notstrict,)self.sink(table)def_is_unbound_required(self):""" Check whether a graph needs unbound symbol or not """forsymbolinself.__sources_symbols.values():ifsymbolisadaptiveorsymbolisadaptive_to_default:returnTruereturnFalsedef_get_widest_time_range(self):""" Get minimum start time and maximum end time. If time is not found, None is returned. """start_times=[]end_times=[]forstart,endinself.__sources_keys_dates.values():ifstartisnotadaptive:start_times.append(start)ifendisnotadaptive:end_times.append(end)start=min(start_times)ifstart_timeselseNoneend=max(end_times)ifend_timeselseNonereturnstart,enddef_get_date_range(self,symbols=None,start=None,end=None,default_start=None,default_end=None):#noqaifdefault_startisNone:default_start=configuration.config.get('default_start_time',adaptive)ifdefault_endisNone:default_end=configuration.config.get('default_end_time',adaptive)# ------------ ## Find symbolsneed_to_bind_symbol=Falsecommon_symbol=NoneifsymbolsisNone:# let's try to understand whether we could use common symbol for all sources# or we need to bound symbols insteadfirst_symbol=Noneforsymbolinself.__sources_symbols.values():iffirst_symbolisNone:first_symbol=symbolifisinstance(first_symbol,_ManuallyBoundValue):# Mark that we need to bound, but keep common_symbol equal to None.# It is applicable for the bound symbols inside the merge with bound# symbols, for example.need_to_bind_symbol=Trueelse:common_symbol=symbolcontinueifsymbolandsymbol!=first_symbol:need_to_bind_symbol=Truecommon_symbol=Nonebreak# symbol is specified nowhere - just set unbound to the default oneif(first_symbolisadaptiveorfirst_symbolisadaptive_to_default)and(common_symbolisadaptiveorcommon_symbolisadaptive_to_default):common_symbol=configuration.config.default_symbolelse:# when unbound symbols passedcommon_symbol=symbolsneed_to_bind_symbol=True# use to check all sources whether some has bound symbols# Find max and min for _source data rangessources_start,sources_end=self._get_widest_time_range()sources_start=sources_startordefault_startsources_end=sources_endordefault_endcommon_format="%Y-%m-%d %H:%M:%S"forkey,date_rangeinself.__sources_keys_dates.items():# find a function that builds _sourcefunc=self.__sources_base_ep_func[key]iffunc:src=func()# --------------------------# determine whether we have to add modify_query_times to a srcstart_date,end_date=date_rangeifnotself.__sources_modify_query_times[key]:ifstart_dateisnotadaptiveorend_dateisnotadaptive:# if some of the end is specified, then it means# we need to check whether it is worth to wrap into the# modify_query_timesifstart_dateisadaptive:ifstartisNone:start_date=sources_startelse:start_date=startifend_dateisadaptive:ifendisNone:end_date=sources_endelse:end_date=endifstart_dateisnotadaptiveandend_dateisnotadaptive:# it might happen when either sources_start/end are adaptive# or start/end are adaptiveif((startisNoneandsources_startisnotadaptiveandstart_date!=sources_start)or(startisnotNoneandstart_date!=start)or(endisNoneandsources_endisnotadaptiveandend_date!=sources_end)or(endisnotNoneandend_date!=end)):self.__sources_modify_query_times[key]=Truemqt=otq.ModifyQueryTimes(start_time='parse_time(("'+common_format+'.%q"),"'+start_date.strftime(common_format+".%f")+'", _TIMEZONE)',end_time='parse_time(("'+common_format+'.%q"),"'+end_date.strftime(common_format+".%f")+'", _TIMEZONE)',output_timestamp="TIMESTAMP",)src.sink(mqt)ifneed_to_bind_symbol:bound=Noneifkeyinself.__sources_symbols:# TODO: this is wrong, we need to know about symbols# it happens when we do not copy symbols when apply# merge with bound symbols.# Wrong, in that case merge with bound symbol is# non distinguishable from the manually passed None# for external queriesbound=self.__sources_symbols[key]ifisinstance(bound,_ManuallyBoundValue):bound=bound.valueifboundandboundisnotadaptiveandboundisnotadaptive_to_default:src.__node.symbol(bound)else:# if key is not in __sources_symbols, then# it means that symbol was not specified, and# therefor use unbound symbolifcommon_symbolisNone:ifboundisadaptive_to_default:src.__node.symbol(configuration.config.default_symbol)else:pass# # TODO: write test validated this## # raise Exception("One of the branch does not have symbol specified")# --------------------------# glue _source with the main graphself.node().add_rules(src.node().copy_rules())self.source_by_key(src.node().copy_graph(),key)self._merge_tmp_otq(src)else:passifstartisNone:start=sources_startifendisNone:end=sources_endreturnstart,end,common_symboldef_to_graph(self,add_passthrough=True):""" Construct the graph. Only for internal usage. It is private, because it constructs the raw graph assuming that a graph is already defined, and might confuse an end user, because by default Source is not fully defined; it becomes fully defined only when symbols, start and end datetime are specified. """constructed_obj=self.copy()# we add it for case when the last EP has a pin outputifadd_passthrough:constructed_obj.sink(otq.Passthrough())returnotq.GraphQuery(constructed_obj.node().get())
[docs]defto_graph(self,raw=None,symbols=None,start=None,end=None,*,add_passthrough=True):""" Construct an otq.GraphQuery object. Parameters ---------- raw: .. deprecated:: 1.4.17 has no effect symbols: symbols query to add to otq.QraphQuery start: datetime start time of a query end: datetime end time of a query add_passthrough: bool add additional Passthrough ep to the end of a resulted graph Returns ------- otq.QraphQuery See Also -------- :meth:`render` """ifrawisnotNone:warnings.warn('The "raw" flag is deprecated and makes not effect',DeprecationWarning)_obj,_start,_end,_symbols=self.__prepare_graph(symbols,start,end)if_obj._tmp_otq.queries:warnings.warn('Using .to_graph() for a Source object that uses sub-queries! ''This operation is deprecated and is not guaranteed to work as expected. ''Such a Source should be executed using otp.run() or saved to disk using to_otq()',DeprecationWarning)_obj.sink(otq.Passthrough().output_pin_name('OUT_FOR_TO_GRAPH'))_graph=_obj._to_graph(add_passthrough=False)_graph.set_start_time(_start)_graph.set_end_time(_end)_graph.set_symbols(_symbols)query=_obj._tmp_otq.save_to_file(query=_graph,file_suffix='_to_graph.otq')query_path,query_name=query.split('::')query_params=get_query_parameter_list(query_path,query_name)source_with_nested_query=otp.Query(otp.query(query,**{param:f'${param}'forparaminquery_params}),out_pin='OUT_FOR_TO_GRAPH')returnsource_with_nested_query.to_graph(symbols=_symbols,start=_start,end=_end,add_passthrough=add_passthrough)else:return_obj._to_graph(add_passthrough=add_passthrough)
[docs]defrender(self,**kwargs):""" Renders a calculation graph using the ``graphviz`` library. Every node is the onetick query language event processor. Nodes in nested queries, first stage queries and eval queries are not shown. Could be useful for debugging and in jupyter to learn the underlying graph. Note that it's required to have :graphviz:`graphviz <>` package installed. Examples -------- >>> data = otp.Ticks(X=[1, 2, 3]) >>> data1, data2 = data[(data['X'] > 2)] >>> data = otp.merge([data1, data2]) >>> data.render() # doctest: +SKIP .. graphviz:: ../../static/render_example.dot """kwargs.setdefault('verbose',True)self._to_graph().render(**kwargs)
[docs]@inplace_operationdefwrite(self,db:Union[str,'otp.DB'],symbol:Optional[Union[str,'otp.Column']]=None,tick_type:Optional[Union[str,'otp.Column']]=None,date:Optional[date]=adaptive,append:bool=True,keep_symbol_and_tick_type:bool=adaptive,propagate:bool=True,out_of_range_tick_action:Literal['exception','ignore','load']='exception',timestamp:Optional['otp.Column']=None,keep_timestamp:bool=True,correction_type:Optional['otp.Column']=None,replace_existing_time_series:bool=False,allow_concurrent_write:bool=False,context:str=adaptive,use_context_of_query:bool=False,inplace:bool=False,**kwargs)->Optional['Source']:""" Saves data result to OneTick database. Note ---- This method does not save anything. It adds instruction in query to save. Data will be saved when query will be executed. Parameters ---------- db: str or :py:class:`otp.DB <onetick.py.DB>` database name or object. symbol: str or Column resulting symbol name string or column to get symbol name from. If this parameter is not set, then ticks _SYMBOL_NAME pseudo-field is used. If it is empty, an attempt is made to retrieve the symbol name from the field named SYMBOL_NAME. tick_type: str or Column resulting tick type string or column to get tick type from. If this parameter is not set, the _TICK_TYPE pseudo-field is used. If it is empty, an attempt is made to retrieve the tick type from the field named TICK_TYPE. date: datetime or None date where to save data. Should be set to `None` if writing to accelerator or memory database. By default, it is set to `otp.config.default_date`. append: bool If False - data will be rewritten for this ``date``, otherwise data will be appended (new symbols are added, existing symbols can be modified (append new ticks, modify existing ticks)). This option is not valid for accelerator databases. keep_symbol_and_tick_type: bool keep fields containing symbol name and tick type when writing ticks to the database or propagating them. By default, this parameter is adaptive. If ``symbol`` or ``tick_type`` are column objects, then it's set to True. Otherwise, it's set to False. propagate: bool Propagate ticks after that event processor or not. out_of_range_tick_action: str Action to be executed if tick's timestamp's date is not ``date``: * 'ignore': tick will not be written to the database * 'exception': runtime exception will be raised timestamp: Column Field that contains the timestamp with which the ticks will be written to the database. By default, the TIMESTAMP pseudo-column is used. keep_timestamp: bool If ``timestamp`` parameter is set and this parameter is set to True, then timestamp column is removed. correction_type: Column The name of the column that contains the correction type. This column will be removed. If this parameter is not set, no corrections will be submitted. replace_existing_time_series: bool If ``append`` is set to True, setting this option to True instructs the loader to replace existing time series, instead of appending to them. Other time series will remain unchanged. allow_concurrent_write: bool Allows different queries running on the same server to load concurrently into the same database. context: str The server context used to look up the database. By default, `otp.config.context` is used if ``use_context_of_query`` is not set. use_context_of_query: bool If this parameter is set to True and the ``context`` parameter is not set, the context of the query is used instead of the default value of the ``context`` parameter. inplace: bool A flag controls whether operation should be applied inplace. If ``inplace=True``, then it returns nothing. Otherwise, method returns a new modified object. kwargs: .. deprecated:: use named parameters instead. Returns ------- :class:`Source` or None See also -------- **WRITE_TO_ONETICK_DB** OneTick event processor Examples -------- >>> data = otp.Ticks(X=[1, 2, 3]) >>> data = data.write('SOME_DB', symbol='S_WRITE', tick_type='T_WRITE') >>> otp.run(data) Time X 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 2 2 2003-12-01 00:00:00.002 3 >>> data = otp.DataSource('SOME_DB', symbol='S_WRITE', tick_type='T_WRITE') >>> otp.run(data) Time X 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 2 2 2003-12-01 00:00:00.002 3 """if'append_mode'inkwargs:warnings.warn("Parameter 'append_mode' is deprecated, use 'append'",DeprecationWarning)append=kwargs.pop('append_mode')if'timestamp_field'inkwargs:warnings.warn("Parameter 'timestamp_field' is deprecated, use 'timestamp'",DeprecationWarning)timestamp=kwargs.pop('timestamp_field')if'keep_timestamp_field'inkwargs:warnings.warn("Parameter 'keep_timestamp_field' is deprecated, use 'keep_timestamp'",DeprecationWarning)keep_timestamp=kwargs.pop('keep_timestamp_field')ifkwargs:raiseTypeError(f'write() got unexpected arguments: {list(kwargs)}')kwargs={}ifdateisadaptive:date=configuration.config.default_dateifsymbolisnotNone:ifisinstance(symbol,_Column):kwargs['symbol_name_field']=str(symbol)ifkeep_symbol_and_tick_typeisadaptive:keep_symbol_and_tick_type=Trueelse:kwargs.setdefault('symbol_name_field','_SYMBOL_NAME_FIELD_')self[kwargs['symbol_name_field']]=symboliftick_typeisnotNone:ifisinstance(tick_type,_Column):kwargs['tick_type_field']=str(tick_type)ifkeep_symbol_and_tick_typeisadaptive:keep_symbol_and_tick_type=Trueelse:kwargs.setdefault('tick_type_field','_TICK_TYPE_FIELD_')self[kwargs['tick_type_field']]=tick_typeifkeep_symbol_and_tick_typeisadaptive:keep_symbol_and_tick_type=FalseiftimestampisnotNone:kwargs['timestamp_field']=str(timestamp)ifcorrection_typeisnotNone:kwargs['correction_type_field']=str(correction_type)ifcontextisnotadaptive:kwargs['context']=contextelifnotuse_context_of_query:kwargs['context']=otp.config.contextself.sink(otq.WriteToOnetickDb(database=str(db),date=date.strftime('%Y%m%d')ifdateelse'',append_mode=append,keep_symbol_name_and_tick_type=keep_symbol_and_tick_type,propagate_ticks=propagate,out_of_range_tick_action=out_of_range_tick_action.upper(),keep_timestamp_field=keep_timestamp,replace_existing_time_series=replace_existing_time_series,allow_concurrent_write=allow_concurrent_write,use_context_of_query=use_context_of_query,**kwargs))forcolin('_SYMBOL_NAME_FIELD_','_TICK_TYPE_FIELD_'):ifcolinself.schema:self.drop(col,inplace=True)to_drop=set()ifnotkeep_symbol_and_tick_type:to_drop.update((kwargs['symbol_name_field'],kwargs['tick_type_field']))ifnotkeep_timestampandtimestampisnotNoneandstr(timestamp)notin{'Time','TIMESTAMP'}:to_drop.add(str(timestamp))ifcorrection_typeisnotNone:to_drop.add(str(correction_type))self.schema.set(**{k:vfork,vinself.schema.items()ifknotinto_drop})returnself
[docs]defcopy(self,ep=None,columns=None,deep=False)->'Source':""" Build an object with copied calculation graph. Every node of the resulting graph has the same id as in the original. It means that if the original and copied graphs are merged or joined together further then all common nodes (all that created beforethe .copy() method) will be glued. For example, let's imagine that you have the following calculation graph ``G`` .. graphviz:: digraph { rankdir="LR"; A -> B; } where ``A`` is a source and ``B`` is some operation on it. Then we copy it to the ``G'`` and assign a new operation there .. graphviz:: digraph { rankdir="LR"; A -> B -> C; } After that we decied to merge ``G`` and ``G'``. The resulting calculation graph will be: .. graphviz:: digraph { rankdir="LR"; A -> B -> C -> MERGE; B -> MERGE; } Please use the :meth:`Source.deepcopy` if you want to get the following calculation graph after merges and joins .. graphviz:: digraph { rankdir="LR"; A -> B -> C -> MERGE; "A'" -> "B'" -> "C'" -> MERGE; } Returns ------- Source See Also -------- Source.deepcopy """ifcolumnsisNone:columns=self.columns()ifep:result=self.__class__(node=ep,**columns)result.source(self.node().copy_graph())# we need to clean it, because ep is not a _sourceresult._clean_sources_dates()else:result=self.__class__(node=self.node(),**columns)result.node().add_rules(self.node().copy_rules(deep=deep))result._set_sources_dates(self)ifdeep:# generating all new uuids for node history and for sources# after they were initializedkeys=defaultdict(uuid.uuid4)# type: ignoreresult.node().rebuild_graph(keys)result._change_sources_keys(keys)# add stateresult._copy_state_vars_from(self)result._tmp_otq=self._tmp_otq.copy()result.__name=self.__name#noqaresult._copy_properties_from(self)returnresult
[docs]defdeepcopy(self,ep=None,columns=None)->'onetick.py.Source':""" Copy all graph and change ids for every node. More details could be found in :meth:`Source.copy` See Also -------- Source.copy """returnself.copy(ep,columns,deep=True)
def_copy_properties_from(self,obj):# needed if we are doing copy of a child with custom propertiesforattrinset(self.__class__._PROPERTIES)-set(Source._PROPERTIES):setattr(self,attr,getattr(obj,attr))def_copy_state_vars_from(self,objs):self.__dict__["_state_vars"]=StateVars(self,objs)
[docs]defsplit(self,expr,cases,default=False)->Tuple['Source',...]:""" The method splits data using passed expression ``expr`` for several outputs by passed ``cases``. The method is the alias for the :meth:`Source.switch` Parameters ---------- expr : Operation column or column based expression cases : list list of values or :py:class:`onetick.py.range` objects to split by default : bool ``True`` adds the default output Returns ------- Outputs according to passed cases, number of outputs is len(cases) plus one if ``default=True`` See also -------- | :meth:`Source.switch` | **SWITCH** OneTick event processor Examples -------- >>> # OTdirective: snippet-name: Source operations.split; >>> data = otp.Ticks(X=[0.33, -5.1, otp.nan, 9.4]) >>> r1, r2, r3 = data.split(data['X'], [otp.nan, otp.range(0, 100)], default=True) >>> r1() Time X 0 2003-12-01 00:00:00.002 NaN >>> r2() Time X 0 2003-12-01 00:00:00.000 0.33 1 2003-12-01 00:00:00.003 9.40 >>> r3() Time X 0 2003-12-01 00:00:00.001 -5.1 See Also -------- Source.switch :py:class:`onetick.py.range` """output_num=len(cases)# format casesdefto_str(v):ifisinstance(v,onetick.py.utils.range):return"["+str(v.start)+","+str(v.stop)+"]"elifisinstance(v,str):return'"'+v+'"'elifisinstance(v,tuple):return",".join(map(to_str,list(v)))returnstr(v)cases=[f"{to_str(cases[inx])}:OUT{inx}"forinxinrange(output_num)]# create epparams=dict(switch=str(expr),cases=";".join(cases))ifdefault:params["default_output"]="DEF_OUT"switch=self.copy(ep=otq.SwitchEp(**params))# construct resultsresult=[]forinxinrange(output_num):res=switch.copy()res.node().out_pin(f"OUT{inx}")res.sink(otq.Passthrough())result.append(res)ifdefault:res=switch.copy()res.node().out_pin("DEF_OUT")res.sink(otq.Passthrough())result.append(res)returntuple(result)
[docs]defswitch(self,expr,cases,default=False)->Tuple['Source',...]:""" The method splits data using passed expression for several outputs by passed cases. This method is an alias for :meth:`Source.split` method. Parameters ---------- expr : Operation column or column based expression cases : list list of values or :py:class:`onetick.py.range` objects to split by default : bool ``True`` adds the default output Returns ------- Outputs according to passed cases, number of outputs is len(cases) plus one if ``default=True`` See also -------- | :meth:`Source.split` | **SWITCH** OneTick event processor Examples -------- >>> data = otp.Ticks(X=[0.33, -5.1, otp.nan, 9.4]) >>> r1, r2, r3 = data.switch(data['X'], [otp.nan, otp.range(0, 100)], default=True) >>> r1() Time X 0 2003-12-01 00:00:00.002 NaN >>> r2() Time X 0 2003-12-01 00:00:00.000 0.33 1 2003-12-01 00:00:00.003 9.40 >>> r3() Time X 0 2003-12-01 00:00:00.001 -5.1 See Also -------- Source.split :py:class:`onetick.py.range` """returnself.split(expr,cases,default)
defcolumns(self,skip_meta_fields=False):""" Return columns in data source Parameters ---------- skip_meta_fields: bool, default=False do not add meta fields Returns ------- dict """result={}forkey,valueinself.__dict__.items():ifskip_meta_fieldsandkeyinself.__class__.meta_fields:continueifkeyinself.__class__._PROPERTIES:continueifisinstance(value,_Column):result[value.name]=value.dtypereturnresultdefdrop_columns(self):""" Method removes all columns in the python representation, but don't drop columns on the data. It is used when external query is applied, because we don't know how data schema has changed. """items=[]forkey,valueinself.__dict__.items():ifkeyinself.__class__.meta_fieldsorkeyinself.__class__._PROPERTIES:continueifisinstance(value,_Column):items.append(key)foriteminitems:delself.__dict__[item]defnode(self):returnself.__nodedeftick_type(self,tt):self.__node.tick_type(tt)returnselfdefsymbol(self,symbol):""" Apply symbol to graph .. deprecated:: 1.3.31 """warnings.warn("symbol method is deprecated, please specify symbol during creation",DeprecationWarning)self.__node.symbol(symbol)returnselfdefnode_name(self,name=None,key=None):returnself.__node.node_name(name,key)def__add__(self,other)->'Source':returnonetick.py.functions.merge([self,other])
[docs]@inplace_operationdeftable(self,inplace=False,strict:bool=True,**schema)->Optional['Source']:""" Set the OneTick and python schemas levels according to the ``schema`` parameter. The ``schema`` should contain either (field_name -> type) pairs or (field_name -> default value) pairs; ``None`` means no specified type, and OneTick considers it's as a double type. Resulting ticks have the same order as in the ``schema``. If only partial fields are specified (i.e. when the ``strict=False``) then fields from the ``schema`` have the most left position. Parameters ---------- inplace: bool The flag controls whether operations should be applied inplace strict: bool If set to ``False``, all fields present in an input tick will be present in the output tick. If ``True``, then only fields specified in the ``schema``. schema: field_name -> type or field_name -> default value pairs that should be applied on the source. Returns ------- :class:`Source` or ``None`` See Also -------- | :attr:`Source.schema` | :meth:`__getitem__`: the table shortcut | **TABLE** OneTick event processor Examples -------- Selection case >>> data = otp.Ticks(X1=[1, 2, 3], ... X2=[3, 2, 1], ... A1=["A", "A", "A"]) >>> data = data.table(X2=int, A1=str) # OTdirective: snippet-name: Arrange.set schema; >>> otp.run(data) Time X2 A1 0 2003-12-01 00:00:00.000 3 A 1 2003-12-01 00:00:00.001 2 A 2 2003-12-01 00:00:00.002 1 A Defining default values case (note the order) >>> data = otp.Ticks(X=[1, 2, 3]) >>> data = data.table(Y=0.5, strict=False) >>> otp.run(data) Time Y X 0 2003-12-01 00:00:00.000 0.5 1 1 2003-12-01 00:00:00.001 0.5 2 2 2003-12-01 00:00:00.002 0.5 3 """defis_time_type_or_nsectime(obj):returnott.is_time_type(obj)orisinstance(obj,ott.nsectime)deftransformer(name,obj):ifobjisNone:returnnameres=f'{ott.type2str(ott.get_object_type(obj))}{name}'ifnotisinstance(obj,Type)andnotis_time_type_or_nsectime(obj):res+=f' ({ott.value2str(obj)})'returnresdefget_type(value):ifvalueisNone:returnfloatreturnott.get_object_type(value)forc_nameinlist(schema.keys()):ifc_nameinself.__class__.meta_fields:# meta fields should not be propagated to Table ep# otherwise new user-defined field with the same name will appear in schema# and using this field will raise an "ambiguous use" error in OneTickwarnings.warn(f"Meta field '{c_name}' should not be used in .table() method.")schema.pop(c_name)ifnotschema:returnselfschema_to_set={c_name:get_type(c_value)forc_name,c_valueinschema.items()}ifstrict:self.schema.set(**schema_to_set)else:self.schema.update(**schema_to_set)fields=','.join([transformer(c_name,c_value)forc_name,c_valueinschema.items()])self.sink(otq.Table(fields=fields,keep_input_fields=notstrict))forc_name,c_valueinschema.items():# datetime and nsetime values require onetick built-in functions to be initialized# but built-in functions can't be used in table ep so updating columns after the tableifis_time_type_or_nsectime(c_value):self.update({c_name:c_value},where=self[c_name]==0,inplace=True)self._fix_varstrings()returnself
def_fix_varstrings(self):""" PY-556: converting to varstring results in string with null-characters """varstring_columns={name:self[name]forname,dtypeinself.schema.items()ifdtypeisott.varstring}# just updating the column removes null-charactersifvarstring_columns:self.update(varstring_columns,inplace=True)
[docs]@inplace_operationdefdrop(self,columns:List[Any],inplace=False)->Optional['Source']:""" Remove a list of columns from the Source. If column with such name wasn't found the error will be raised, if the regex was specified and there aren't any matched columns, do nothing. Regex is any string containing any of characters ***+?\:[]{}()**, dot is a valid symbol for OneTick identificator, so **a.b** will be passed to OneTick as an identifier, if you want specify such regex use parenthesis - **(a.b)** Parameters ---------- columns : str, Column or list of them Column(s) to remove. You could specify a regex or collection of regexes, in such case columns with match names will be deleted. inplace: bool A flag controls whether operation should be applied inplace. If inplace=True, then it returns nothing. Otherwise method returns a new modified object. Returns ---------- :class:`Source` or ``None`` See Also -------- **PASSTHROUGH** OneTick event processor Examples -------- >>> data = otp.Ticks(X1=[1, 2, 3], ... X2=[3, 2, 1], ... A1=["A", "A", "A"]) >>> data = data.drop("X1") # OTdirective: snippet-name: Arrange.drop.one field; >>> otp.run(data) Time X2 A1 0 2003-12-01 00:00:00.000 3 A 1 2003-12-01 00:00:00.001 2 A 2 2003-12-01 00:00:00.002 1 A Regexes also could be specified in such case all matched columns will be deleted >>> data = otp.Ticks(X1=[1, 2, 3], ... X2=[3, 2, 1], ... A1=["A", "A", "A"]) >>> data = data.drop(r"X\d+") # OTdirective: snippet-name: Arrange.drop.regex; >>> otp.run(data) Time A1 0 2003-12-01 00:00:00.000 A 1 2003-12-01 00:00:00.001 A 2 2003-12-01 00:00:00.002 A Several parameters can be specified >>> # OTdirective: snippet-name: Arrange.drop.multiple; >>> data = otp.Ticks(X1=[1, 2, 3], ... X2=[3, 2, 1], ... Y1=[1, 2, 3], ... Y2=[3, 2, 1], ... YA=["a", "b", "c"], ... A1=["A", "A", "A"]) >>> data = data.drop([r"X\d+", "Y1", data["Y2"]]) >>> otp.run(data) Time YA A1 0 2003-12-01 00:00:00.000 a A 1 2003-12-01 00:00:00.001 b A 2 2003-12-01 00:00:00.002 c A **a.b** will be passed to OneTick as an identifier, if you want specify such regex use parenthesis - **(a.b)** >>> data = otp.Ticks({"COLUMN.A": [1, 2, 3], "COLUMN1A": [3, 2, 1], ... "COLUMN1B": ["a", "b", "c"], "COLUMN2A": ["c", "b", "a"]}) >>> data = data.drop("COLUMN.A") # OTdirective: skip-snippet:; >>> otp.run(data) Time COLUMN1A COLUMN1B COLUMN2A 0 2003-12-01 00:00:00.000 3 a c 1 2003-12-01 00:00:00.001 2 b b 2 2003-12-01 00:00:00.002 1 c a >>> data = otp.Ticks({"COLUMN.A": [1, 2, 3], "COLUMN1A": [3, 2, 1], ... "COLUMN1B": ["a", "b", "c"], "COLUMN2A": ["c", "b", "a"]}) >>> data = data.drop("(COLUMN.A)") # OTdirective: skip-snippet:; >>> otp.run(data) Time COLUMN1B 0 2003-12-01 00:00:00.000 a 1 2003-12-01 00:00:00.001 b 2 2003-12-01 00:00:00.002 c """self.__delitem__(columns)returnself
[docs]@inplace_operationdefdropna(self,how:Literal["any","all"]="any",subset:Optional[List[Any]]=None,inplace=False)->Optional['Source']:""" Drops ticks that contain NaN values according to the policy in the ``how`` parameter Parameters ---------- how: "any" or "all" ``any`` - filters out ticks if at least one field has NaN value ``all`` - filters out ticks if all fields have NaN values. subset: list of str list of columns to check for NaN values. If ``None`` then all columns are checked. inplace: bool the flag controls whether operation should be applied inplace. Returns ------- :class:`Source` or ``None`` Examples -------- Drop ticks where **at least one** field has ``nan`` value. >>> data = otp.Ticks([[ 'X', 'Y'], ... [ 0.0, 1.0], ... [ otp.nan, 2.0], ... [ 4.0, otp.nan], ... [ otp.nan, otp.nan], ... [ 6.0, 7.0]]) >>> data = data.dropna() >>> otp.run(data)[['X', 'Y']] X Y 0 0.0 1.0 1 6.0 7.0 Drop ticks where **all** fields have ``nan`` values. >>> data = otp.Ticks([[ 'X', 'Y'], ... [ 0.0, 1.0], ... [ otp.nan, 2.0], ... [ 4.0, otp.nan], ... [ otp.nan, otp.nan], ... [ 6.0, 7.0]]) >>> data = data.dropna(how='all') >>> otp.run(data)[['X', 'Y']] X Y 0 0.0 1.0 1 NaN 2.0 2 4.0 NaN 3 6.0 7.0 Drop ticks where **all** fields in **subset** of columns have ``nan`` values. >>> data = otp.Ticks([[ 'X', 'Y', 'Z'], ... [ 0.0, 1.0, otp.nan], ... [ otp.nan, 2.0, otp.nan], ... [ 4.0, otp.nan, otp.nan], ... [ otp.nan, otp.nan, otp.nan], ... [ 6.0, 7.0, otp.nan]]) >>> data = data.dropna(how='all', subset=['X', 'Y']) >>> otp.run(data)[['X', 'Y', 'Z']] X Y Z 0 0.0 1.0 NaN 1 NaN 2.0 NaN 2 4.0 NaN NaN 3 6.0 7.0 NaN """ifhownotin["any","all"]:raiseValueError(f"It is expected to see 'any' or 'all' values for 'how' parameter, but got '{how}'")condition=Nonecolumns=self.columns(skip_meta_fields=True)ifsubsetisnotNone:forcolumn_nameinsubset:ifcolumn_namenotincolumns:raiseValueError(f"There is no '{column_name}' column in the source")ifcolumns[column_name]isnotfloat:raiseValueError(f"Column '{column_name}' is not float type")forcolumn_name,dtypeincolumns.items():ifsubsetisnotNoneandcolumn_namenotinsubset:continueifdtypeisfloat:ifconditionisNone:condition=self[column_name]!=ott.nanelse:ifhow=="any":condition&=self[column_name]!=ott.nanelifhow=="all":condition|=self[column_name]!=ott.nanself.sink(otq.WhereClause(where=str(condition)))returnself
def__delitem__(self,obj):ifisinstance(obj,list)orisinstance(obj,tuple):objs=objelse:objs=(obj,)# we can't be sure python Source has consistent columns cache, because sinking complex event processors# can change columns unpredictable, so if user will specify regex as a param, we will pass regex# as an onetick's param, but delete all matched columns from python Source cache.# items_to_passthrough - items to pass to Passthrough as parameters# names_of_columns - names of onetick.py Source columns to delete from Source instanceitems_to_passthrough,names_of_columns,regex=self._get_columns_names(objs)self._validate_columns_names(names_of_columns)foriteminnames_of_columns:delself.__dict__[item]self.sink(otq.Passthrough(drop_fields=True,fields=",".join(items_to_passthrough),use_regex=regex))def_validate_columns_names(self,names_of_columns):foriteminnames_of_columns:ifiteminself.__dict__:ifnotisinstance(self.__dict__[item],_Column):raiseAttributeError(f"There is no '{item}' column")else:raiseAttributeError(f"There is no '{item}' column")def_get_columns_names(self,objs):items_to_passthrough=[]names_of_columns=[]regex=Falseforobjinobjs:ifisinstance(obj,_Column):items_to_passthrough.append(obj.name)names_of_columns.append(obj.name)elifisinstance(obj,str):items_to_passthrough.append(obj)ifany(cin"*+?\\:[]{}()"forcinobj):# it is a possible regexregex=Truenames_of_columns.extend(colforcolinself.columns()ifre.match(obj,col))else:names_of_columns.append(obj)else:raiseTypeError(f"It is not supported to delete item '{obj}' of type '{type(obj)}'")# remove duplications and meta_fieldsnames_of_columns=set(names_of_columns)-set(self.__class__.meta_fields)returnitems_to_passthrough,list(names_of_columns),regexdef__from_ep_to_proxy(self,ep):in_pin,out_pin=None,Noneifisinstance(ep,otq.graph_components.EpBase.PinnedEp):ifhasattr(ep,"_output_name"):out_pin=getattr(ep,"_output_name")else:in_pin=getattr(ep,"_input_name")ep=getattr(ep,"_ep")returnep,uuid.uuid4(),in_pin,out_pin
[docs]defsink(self,ep,out_pin=None,move_node=True):""" Appends node inplace to the source. Connect `out_pin` of this source to `ep`. Can be used to connect onetick.query objects to onetick.py source. Data schema changes (added or deleted columns) will not be detected automatically after applying `sink` function, so the user must change the schema himself by updating `schema` property. Parameters ---------- ep: otq.graph_components.EpBase,\ otq.graph_components.EpBase.PinnedEp,\ Tuple[otq.graph_components.EpBase, uuid.uuid4, Optional[str], Optional[str]] onetick.query EP object to append to source. out_pin: Optional[str], default=None name of the out pin to connect to `ep` move_node: bool, default=True Returns ---------- result: otq.graph_components.EpBase, otq.graph_components.EpBase.PinnedEp Last node of the source See Also -------- onetick.py.Source.schema onetick.py.core._source.schema.Schema Examples -------- Adding column 'B' directly with onetick.query EP. >>> data = otp.Tick(A=1) >>> data.sink(otq.AddField(field='B', value=2)) # OTdirective: skip-snippet:; AddField(field='B',value=2) >>> otp.run(data) # OTdirective: skip-snippet:; Time A B 0 2003-12-01 1 2 But we can't use this column with onetick.py methods yet. >>> data['C'] = data['B'] # OTdirective: skip-snippet:; # doctest: +ELLIPSIS Traceback (most recent call last): ... KeyError: 'Column name B is not in the schema... We should manually change source's schema >>> data.schema.update(B=int) # OTdirective: skip-snippet:; >>> data['C'] = data['B'] >>> otp.run(data) Time A B C 0 2003-12-01 1 2 2 """ifnot(issubclass(type(ep),otq.graph_components.EpBase)orissubclass(type(ep),otq.graph_components.EpBase.PinnedEp)ortype(ep)istuple):raiseException("sinking is allowed only for EpBase instances")iftype(ep)istuple:# for already existed EP fetched from _ProxyNodereturnself.__node.sink(out_pin,*ep,move_node)else:returnself.__node.sink(out_pin,*self.__from_ep_to_proxy(ep),move_node)
def__rshift__(self,ep):""" duplicates sink() method, but returns new object """new_source=self.copy()new_source.sink(ep)returnnew_sourcedef__irshift__(self,ep):""" duplicates sink() method, but assigns source new object """new_source=self.copy()new_source.sink(ep)returnnew_sourcedefsource(self,ep,in_pin=None):""" Add node as source to root node """ifnot(issubclass(type(ep),otq.graph_components.EpBase)orissubclass(type(ep),otq.graph_components.EpBase.PinnedEp)ortype(ep)istuple):raiseException("sourcing is allowed only for EpBase instances")iftype(ep)istuple:# for already existed EP fetched from _ProxyNodereturnself.__node.source(in_pin,*ep)else:returnself.__node.source(in_pin,*self.__from_ep_to_proxy(ep))defsource_by_key(self,ep,to_key):""" Add node as source to graph node by key"""ifnot(issubclass(type(ep),otq.graph_components.EpBase)orissubclass(type(ep),otq.graph_components.EpBase.PinnedEp)ortype(ep)istuple):raiseException("sourcing is allowed only for EpBase instances")iftype(ep)istuple:# for already existed EP fetched from _ProxyNodereturnself.__node.source_by_key(to_key,*ep)else:returnself.__node.source_by_key(to_key,*self.__from_ep_to_proxy(ep))defapply_query(self,query,in_pin="IN",out_pin="OUT",**params):""" Apply data source to query .. deprecated:: 1.3.77 See Also -------- Source.apply """warnings.warn('The "apply_qery" method is deprecated. Please, use the .apply method instead or '"call a reference queries directly.",DeprecationWarning)res=onetick.py.functions.apply_query(query,{in_pin:self},[out_pin],**params)res.node().out_pin(out_pin)returnres
[docs]defapply(self,obj)->Union['otp.Column','Source']:""" Apply object to data source. Parameters ---------- obj: onetick.py.query, Callable, type, onetick.query.GraphQuery - `onetick.py.query` allows to apply external nested query - python `Callable` allows to translate python code to similar OneTick's CASE expression. There are some limitations to which python operators can be used in this callable. See :ref:`Python callables parsing guide <python callable parser>` article for details. In :ref:`Remote OTP with Ray<ray-remote>` any `Callable` must be decorated with `@otp.remote` decorator, see :ref:`Ray usage examples<apply-remote-context>` for details. - `type` allows to apply default type convertation - `onetick.query.GraphQuery` allows to apply a build onetick.query.Graph Returns ------- Column, Source Examples -------- Apply external query to a tick flow. In this case it assumes that query has only one input and one output. Check the :class:`query` examples if you want to use a query with multiple inputs or outputs. >>> data = otp.Ticks(X=[1, 2, 3]) >>> external_query = otp.query('update.otq') >>> data = data.apply(external_query) >>> otp.run(data) Time X 0 2003-12-01 00:00:00.000 2 1 2003-12-01 00:00:00.001 4 2 2003-12-01 00:00:00.002 6 Apply a predicate to a column / operation. In this case value passed to a predicate is column values. Result is a column. >>> data = otp.Ticks(X=[1, 2, 3]) >>> data['Y'] = data['X'].apply(lambda x: x * 2) >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.000 1 2 1 2003-12-01 00:00:00.001 2 4 2 2003-12-01 00:00:00.002 3 6 Another example of applying more sophisticated operation >>> data = otp.Ticks(X=[1, 2, 3]) >>> data['Y'] = data['X'].apply(lambda x: 1 if x > 2 else 0) >>> otp.run(data) Time X Y 0 2003-12-01 00:00:00.000 1 0 1 2003-12-01 00:00:00.001 2 0 2 2003-12-01 00:00:00.002 3 1 Example of applying a predicate to a Source. In this case value passed to a predicate is a whole tick. Result is a column. >>> data = otp.Ticks(X=[1, 2, 3], Y=[.5, -0.4, .2]) >>> data['Z'] = data.apply(lambda tick: 1 if abs(tick['X'] * tick['Y']) > 0.5 else 0) >>> otp.run(data) Time X Y Z 0 2003-12-01 00:00:00.000 1 0.5 0 1 2003-12-01 00:00:00.001 2 -0.4 1 2 2003-12-01 00:00:00.002 3 0.2 1 See Also -------- :py:class:`onetick.py.query` :py:meth:`onetick.py.Source.script` :ref:`Python callables parsing guide <python callable parser>` """ifisinstance(obj,onetick.py.sources.query):graph=obj.graph_infoiflen(graph.nested_inputs)!=1:raiseException(f'It is expected the query "{obj.query_name}" to have one input, but it 'f"has {len(graph.nested_inputs)}")iflen(graph.nested_outputs)>1:raiseException(f'It is expected the query "{obj.query_name}" to have one or no output, but it 'f"has {len(graph.nested_outputs)}")in_pin=graph.nested_inputs[0].NESTED_INPUTiflen(graph.nested_outputs)==0:out_pin=None# means no outputelse:out_pin=graph.nested_outputs[0].NESTED_OUTPUTreturnobj(**{in_pin:self})[out_pin]elifisinstance(obj,otq.GraphQuery):tmp_file=utils.TmpFile(suffix=".otq")obj.save_to_file(tmp_file.path,"graph_query_to_apply",# start and end times don't matter for this query, use some constantsstart=db_constants.DEFAULT_START_DATE,end=db_constants.DEFAULT_END_DATE,)query_info=get_query_info(tmp_file.path)iflen(query_info.sources)!=1:raiseException("It is expected the query to have one input, but it "f"has {len(query_info.sources)}")iflen(query_info.sinks)!=1:raiseException("It is expected the query to have one output, but it "f"has {len(query_info.sinks)}")add_pins(tmp_file.path,"graph_query_to_apply",[(query_info.sources[0],1,"IN"),(query_info.sinks[0],0,"OUT")],)returnonetick.py.sources.query(tmp_file.path+"::graph_query_to_apply")(IN=self)["OUT"]returnapply_lambda(obj,_EmulateObject(self))
[docs]defdump(self,label:str=None,where:'otp.Operation'=None,columns:Union[str,Tuple[str],List[str]]=None,callback:Callable=None):""" Dumps the ``columns`` from ticks into std::out in runtime if they fit the ``where`` condition. Every dump has a corresponding header that always includes the TIMESTAMP field. Other fields could be configured using the ``columns`` parameter. A header could be augmented with a ``label`` parameter; this label is an addition column that helps to distinguish ticks from multiple dumps with the same schema, because ticks from different dumps could be mixed. It might happen bacause of the OneTick multithreading, and there is the operating system buffer between the OneTick and the actual output. This method is helpful for debugging. Parameters ---------- label: str A label for a dump. It adds a special column **_OUT_LABEL_** for all ticks and set to the specified value. It helps to distinguish ticks from multiple dumps, because actual output could contain mixed ticks due the concurrency. ``None`` means no label. where: Operation A condition that allows to filter ticks to dump. ``None`` means no filtering. columns: str, tupel or list List of columns that shoud be in the output. ``None`` means dump all columns. callback : callable Callable, which preprocess source before printing. See also -------- **WRITE_TEXT** OneTick event processor Examples -------- >>> # OTdirective: skip-snippet:; >>> data.dump(label='Debug point', where=data['PRICE'] > 99.3, columns=['PRICE', 'QTY']) # doctest: +SKIP >>> data.dump(columns="X", callback=lambda x: x.first(), label="first") # doctest: +SKIP """self_c=self.copy()ifcallback:self_c=callback(self_c)ifwhereisnotNone:# can't be simplified because the _Operation overrides __bool__self_c,_=self_c[(where)]ifcolumns:self_c=self_c[columnsifisinstance(columns,(list,tuple))else[columns]]iflabel:self_c['_OUT_LABEL_']=labelself_c.sink(otq.WriteText(formats_of_fields='TIMESTAMP=%|'+configuration.config.tz+'|%d-%m-%Y %H:%M:%S.%J',prepend_timestamp=False,prepend_symbol_name=False,propagate_ticks=True))# print <no data> in case there are 0 ticksself_c=self_c.agg({'COUNT':otp.agg.count()})self_c,_=self_c[self_c['COUNT']==0]self_c['NO_DATA']='<no data>'self_c=self_c[['NO_DATA']]self_c.sink(otq.WriteText(output_headers=False,prepend_timestamp=False,prepend_symbol_name=False,propagate_ticks=True))# Do not propagate ticks then, because we want just to write them into# the std::out. We have to do that, because otherwise these ticks would# go to a query output, that would mess real output.self_c,_=self_c[self_c['Time']!=self_c['Time']]# We have to merge the branch back to the main branch even that these# branch does not generate ticks, because we do not introduce one more# output point, because the OneTick would add it to the final output# datastructure.self.sink(otq.Merge(identify_input_ts=False))self.source(self_c.node().copy_graph())self.node().add_rules(self_c.node().copy_rules())self._merge_tmp_otq(self_c)
[docs]defscript(self,func:Union[Callable[['Source'],Optional[bool]],str],inplace=False)->'Source':# TODO: need to narrow Source object to get rid of undesired methods like aggregations""" Implements a script for every tick. Allows to pass a ``func`` that will be applied per every tick. A ``func`` can be python callable in this case it will be translated to per tick script. In order to use it in Remote OTP with Ray, the function should be decorated with ``@otp.remote``, see :ref:`Ray usage examples<apply-remote-context>` for details. See :ref:`Per Tick Script Guide <python callable parser>` for more detailed description of python to OneTick code translation and per-tick script features. The script written in per tick script language can be passed itself as a string or path to a file with the code. onetick-py doesn't validate the script, but configure the schema accordingly. Parameters ---------- func: callable, str or path - a callable that takes only one parameter - actual tick that behaves like a `Source` instance - or the script on per-tick script language - or a path to file with onetick script Returns ------- :class:`Source` See also -------- **PER_TICK_SCRIPT** OneTick event processor Examples -------- >>> t = otp.Ticks({'X': [1, 2, 3], 'Y': [4, 5, 6]}) >>> def fun(tick): ... tick['Z'] = 0 ... if tick['X'] + tick['Y'] == 5: ... tick['Z'] = 1 ... elif tick['X'] + tick['Y'] * 2 == 15: ... tick['Z'] = 2 >>> t = t.script(fun) >>> otp.run(t) Time X Y Z 0 2003-12-01 00:00:00.000 1 4 1 1 2003-12-01 00:00:00.001 2 5 0 2 2003-12-01 00:00:00.002 3 6 2 See also -------- :py:meth:`onetick.py.Source.apply` :ref:`Per-Tick Script Guide <python callable parser>` """res=selfifinplaceelseself.copy()changed_tick_lists={}ifcallable(func):_new_columns,_script=apply_script(func,_EmulateObject(self))changed_tick_lists=_EmulateObject.get_changed_tick_lists()elifisinstance(func,str):ifos.path.isfile(func):# path to the file with scriptwithopen(func)asfile:_script=file.read()else:_script=func_new_columns=self._extract_columns_from_script(_script)else:raiseValueError("Wrong argument was specify, please use callable or string with either script on per tick ""language or path to it")res.sink(otq.PerTickScript(script=_script))res.schema.update(**_new_columns)fortick_list_name,tick_list_schemainchanged_tick_lists.items():res.state_vars[tick_list_name]._schema=tick_list_schemareturnres
def_extract_columns_from_script(self,script):result={}type2type=dict(byte=int,short=int,uint=int,long=int,ulong=int,int=int,float=float,double=float,string=str,time32=ott.nsectime,nsectime=ott.nsectime,msectime=ott.msectime,varstring=ott.varstring)type2type[int]=inttypes=r"(?P<type>varstring|byte|short|uint|int|ulong|long|" \
r"float|double|decimal|string|time32|msectime|nsectime|matrix)"length=r"(\[(?P<length>\d+)\])?"name=r"\s+(?P<name>\w+)\s*=\s*"pattern=re.compile(types+length+name)forpinre.finditer(pattern,script):groupdict=p.groupdict()type=type2type.get(groupdict["type"])iftype:length=groupdict["length"]iflength:length=int(length)iftypeisstrandlength!=ott.string.DEFAULT_LENGTH:type=ott.string[length]result[groupdict["name"]]=typeelse:warnings.warn(f"{groupdict['type']} isn't supported for now, so field {groupdict['name']} won't "f"be added to schema.")returnresult
[docs]defto_symbol_param(self):""" Creates a read-only instance with the same columns except Time. It is used as a result of a first stage query with symbol params. See also -------- :ref:`Symbol Parameters Objects` :ref:`Symbol parameters` Examples -------- >>> symbols = otp.Ticks({'SYMBOL_NAME': ['S1', 'S2'], 'PARAM': ['A', 'B']}) >>> symbol_params = symbols.to_symbol_param() >>> t = otp.DataSource('SOME_DB', tick_type='TT') >>> t['S_PARAM'] = symbol_params['PARAM'] >>> result = otp.run(t, symbols=symbols) >>> result['S1'] Time X S_PARAM 0 2003-12-01 00:00:00.000 1 A 1 2003-12-01 00:00:00.001 2 A 2 2003-12-01 00:00:00.002 3 A """return_SymbolParamSource(**self.columns())
@staticmethoddef_convert_symbol_to_string(symbol,tmp_otq=None,start=None,end=None,timezone=None):ifstartisadaptive:start=Noneifendisadaptive:end=Noneifisinstance(symbol,Source):symbol=otp.eval(symbol).to_eval_string(tmp_otq=tmp_otq,start=start,end=end,timezone=timezone,operation_suffix='symbol',query_name=None,file_suffix=symbol._name_suffix('symbol.otq'))ifisinstance(symbol,onetick.py.sources.query):returnsymbol.to_eval_string()else:returnsymbol@staticmethoddef_construct_multi_branch_graph(branches):# TODO: add various checks, e.g. that branches have common partsmain=branches[0].copy()forbranchinbranches[1:]:main.node().add_rules(branch.node().copy_rules())main._merge_tmp_otq(branch)returnmaindef_apply_side_branches(self,side_branches):forside_branchinside_branches:self.node().add_rules(side_branch.node().copy_rules())self._merge_tmp_otq(side_branch)self.__sources_keys_dates.update(side_branch.__sources_keys_dates)self.__sources_modify_query_times.update(side_branch.__sources_modify_query_times)self.__sources_base_ep_func.update(side_branch.__sources_base_ep_func)self.__sources_symbols.update(side_branch.__sources_symbols)defsymbols_for(self,func,*,symbol="SYMBOL_NAME",start="_PARAM_START_TIME_NANOS",end="_PARAM_END_TIME_NANOS"):""" Apply ticks from the current _source as symbols for calculations in the 'func'. Parameters ---------- func: callable A callable object that takes only one parameter, that references to the current symbol. symbol: str The column name that contains symbols. Default is 'SYMBOL_NAME'. The mandatory column. start: str The column name that contains start time for a symbol. Default is '_PARAM_START_TIME_NANOS'. The ptional column. end: str The column name that contains end time for a symbol. Default is '_PARAM_END_TIME_NANOS'. The optional column. Returns ------- A multi symbol ticks _source. """symbols=self.copy()ifstr(symbol)!="SYMBOL_NAME":symbols["SYMBOL_NAME"]=symbols[str(symbol)]ifstr(start)!="_PARAM_START_TIME_NANOS":symbols["_PARAM_START_TIME_NANOS"]=symbols[str(start)]ifstr(end)!="_PARAM_END_TIME_NANOS":symbols["_PARAM_END_TIME_NANOS"]=symbols[str(end)]if"SYMBOL_NAME"notinsymbols.columns():raiseException("Ticks do not have the SYMBOL_NAME column, but this is mandatory column.")# -------------- #num_params=len(inspect.signature(func).parameters)ifnum_params==0:logic=func()elifnum_params==1:logic=func(symbols.to_symbol_param())else:raiseValueError(f"It is expected only one parameter from the callback, but {num_params} passed")# TODO: test this case# -------------- ## Find date range for the logic query# If date range is not specified for at least one _source, then# try to deduce date range - set to adaptive# TODO: we don't need to get common symbol from _get_date_range,# refactor this function, so we don't get an error if default symbol is not setlogic_start,logic_end,_=logic.copy()._get_date_range(default_start=adaptive,default_end=adaptive)iflogic_startisadaptive:logic_start=onetick.py.utils.INF_TIMEiflogic_endisadaptive:logic_end=onetick.py.ZERO_TIME# The same as previous, but for the symbolssymbols_start,symbols_end,_=symbols.copy()._get_date_range(default_start=adaptive,default_end=adaptive)ifsymbols_startisadaptive:symbols_start=onetick.py.utils.INF_TIMEifsymbols_endisadaptive:symbols_end=onetick.py.ZERO_TIME# Query interval should be as wider as possiblestart=min(logic_start,symbols_start)end=max(logic_end,symbols_end)# If nothing is specified, then just use defaultifstart==onetick.py.utils.INF_TIME:start=Noneifend==onetick.py.ZERO_TIME:end=Nonereturn_MultiSymbolsSource(symbols.copy(),logic,start=start,end=end)
[docs]defjoin_with_query(self,query,how="outer",symbol=None,params=None,start=None,end=None,timezone=None,prefix=None,caching=None,keep_time=None,where=None,**kwargs,)->'Source':""" For each tick executes ``query``. Parameters ---------- query: callable, Source Callable ``query`` should return :class:`Source`. This object will be evaluated by OneTick (not python) for every tick. Note python code will be executed only once, so all python's conditional expressions will be evaluated only once too. Callable should have ``symbol`` parameter and the parameters with names from ``params`` if they are specified in this method. If ``query`` is a :class:`Source` object then it will be propagated as a query to OneTick. how: 'inner', 'outer' Type of join. If **inner**, then each tick is propagated only if its ``query`` execution has a non-empty result. params: dict Mapping of the parameters' names and their values for the ``query``. :py:class:`Columns <onetick.py.Column>` can be used as a value. symbol: str, Operation or Tuple[Union[str, Operation], dict] Symbol to use in ``query``, in addition dictionary of symbol params can be passed along with symbol. If ``symbol`` is `None` then symbol from the main source is used. start: datetime, Operation Start time of ``query``. By default, start time of the main source is used. end: datetime, Operation End time of ``query``. By default, end time of the main source is used. start_time: .. deprecated:: 1.48.4 The same as ``start``. end_time: .. deprecated:: 1.48.4 The same as ``end``. timezone : Optional, str Timezone of ``query``. By default, timezone of the main source is used. prefix : str Prefix for the names of joined tick fields. caching : str If `None` caching is disabled. You can specify caching by using values: * 'cross_symbol': cache is the same for all symbols * 'per_symbol': cache is different for each symbol. keep_time : str Name for the joined timestamp column. `None` means no timestamp column will be joined. where : Operation Condition to filter ticks for which the result of the ``query`` will be joined. Returns ------- :class:`Source` Source with joined ticks from ``query`` See also -------- **JOIN_WITH_QUERY** OneTick event processor Examples -------- >>> # OTdirective: snippet-name: Special functions.join with query.with an otp data source; >>> d = otp.Ticks(Y=[-1]) >>> d = d.update(dict(Y=1), where=(d.Symbol.name == "a")) >>> data = otp.Ticks(X=[1, 2], ... S=["a", "b"]) >>> res = data.join_with_query(d, how='inner', symbol=data['S']) >>> otp.run(res)[["X", "Y", "S"]] X Y S 0 1 1 a 1 2 -1 b >>> d = otp.Ticks(ADDED=[-1]) >>> d = d.update(dict(ADDED=1), where=(d.Symbol.name == "3")) # symbol name is always string >>> data = otp.Ticks(A=[1, 2], B=[2, 4]) >>> res = data.join_with_query(d, how='inner', symbol=(data['A'] + data['B'])) # OTdirective: skip-snippet:; >>> df = otp.run(res) >>> df[["A", "B", "ADDED"]] A B ADDED 0 1 2 1 1 2 4 -1 Constants as symbols are also supported >>> d = otp.Ticks(ADDED=[d.Symbol.name]) >>> data = otp.Ticks(A=[1, 2], B=[2, 4]) >>> res = data.join_with_query(d, how='inner', symbol=1) # OTdirective: skip-snippet:; >>> df = otp.run(res) >>> df[["A", "B", "ADDED"]] A B ADDED 0 1 2 1 1 2 4 1 Function object as query is also supported (Note it will be executed only once in python's code) >>> def func(symbol): ... d = otp.Ticks(TYPE=["six"]) ... d = d.update(dict(TYPE="three"), where=(symbol.name == "3")) # symbol is always converted to string ... d["TYPE"] = symbol['PREF'] + d["TYPE"] + symbol['POST'] ... return d >>> # OTdirective: snippet-name: Special functions.join with query.with a function >>> data = otp.Ticks(A=[1, 2], B=[2, 4]) >>> res = data.join_with_query(func, how='inner', symbol=(data['A'] + data['B'], dict(PREF="_", POST="$"))) >>> df = otp.run(res) >>> df[["A", "B", "TYPE"]] A B TYPE 0 1 2 _three$ 1 2 4 _six$ It's possible to pass the source itself as a list of symbol parameters, which will make all of its fields accessible through the "symbol" object >>> def func(symbol): ... d = otp.Ticks(TYPE=["six"]) ... d["TYPE"] = symbol['PREF'] + d["TYPE"] + symbol['POST'] ... return d >>> # OTdirective: snippet-name: Source operations.join with query.source as symbol; >>> data = otp.Ticks(A=[1, 2], B=[2, 4], PREF=["_", "$"], POST=["$", "_"]) >>> res = data.join_with_query(func, how='inner', symbol=data) >>> df = otp.run(res) >>> df[["A", "B", "TYPE"]] A B TYPE 0 1 2 _six$ 1 2 4 $six_ The examples above can be rewritten by using otq params instead of symbol params. OTQ parameters are global for query, while symbol parameters can be redefined by bound symbols. >>> def func(symbol, pref, post): ... d = otp.Ticks(TYPE=["six"]) ... d = d.update(dict(TYPE="three"), where=(symbol.name == "3")) # symbol is always converted to string ... d["TYPE"] = pref + d["TYPE"] + post ... return d >>> # OTdirective: snippet-name: Special functions.join with query.with a function that takes params; >>> data = otp.Ticks(A=[1, 2], B=[2, 4]) >>> res = data.join_with_query(func, how='inner', symbol=(data['A'] + data['B']), ... params=dict(pref="_", post="$")) >>> df = otp.run(res) >>> df[["A", "B", "TYPE"]] A B TYPE 0 1 2 _three$ 1 2 4 _six$ Some or all otq params can be column or expression also >>> def func(symbol, pref, post): ... d = otp.Ticks(TYPE=["six"]) ... d = d.update(dict(TYPE="three"), where=(symbol.name == "3")) # symbol is always converted to string ... d["TYPE"] = pref + d["TYPE"] + post ... return d >>> # OTdirective: snippet-name: Special functions.join with query.with a function that takes params from fields; # noqa >>> data = otp.Ticks(A=[1, 2], B=[2, 4], PREF=["^", "_"], POST=["!", "$"]) >>> res = data.join_with_query(func, how='inner', symbol=(data['A'] + data['B']), ... params=dict(pref=data["PREF"] + ".", post=data["POST"])) >>> df = otp.run(res) >>> df[["A", "B", "TYPE"]] A B TYPE 0 1 2 ^.three! 1 2 4 _.six$ You can specify start and end time of the query to select specific ticks from db >>> # OTdirective: snippet-name: Special functions.join with query.passing start/end times; >>> d = otp.Ticks(Y=[1, 2]) >>> data = otp.Ticks(X=[1, 2]) >>> start = datetime(2003, 12, 1, 0, 0, 0, 1000, tzinfo=pytz.timezone("EST5EDT")) >>> end = datetime(2003, 12, 1, 0, 0, 0, 3000, tzinfo=pytz.timezone("EST5EDT")) >>> res = data.join_with_query(d, how='inner', start=start, end=end) >>> otp.run(res) Time Y X 0 2003-12-01 00:00:00.000 1 1 1 2003-12-01 00:00:00.000 2 1 2 2003-12-01 00:00:00.001 1 2 3 2003-12-01 00:00:00.001 2 2 Use keep_time param to keep or rename original timestamp column >>> # OTdirective: snippet-name: Special functions.join with query.keep the timestamps of the joined ticks; >>> d = otp.Ticks(Y=[1, 2]) >>> data = otp.Ticks(X=[1, 2]) >>> res = data.join_with_query(d, how='inner', keep_time="ORIG_TIME") >>> otp.run(res) Time Y ORIG_TIME X 0 2003-12-01 00:00:00.000 1 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.000 2 2003-12-01 00:00:00.001 1 2 2003-12-01 00:00:00.001 1 2003-12-01 00:00:00.000 2 3 2003-12-01 00:00:00.001 2 2003-12-01 00:00:00.001 2 """# TODO: check if join_with_query checks schema of joined source against primary source,# by itself or with process_by_groupdef_columns_to_params(columns,fix_symbol_time=False):params_list=[]forkey,valueincolumns.items():dtype=ott.get_object_type(value)convert_rule="'"+key+"=' + "ifdtypeisstr:ifare_strings(getattr(value,"dtype",None)):convert_rule+=str(value)else:convert_rule+='"'+value+'"'elifdtypeisotp.msectime:convert_rule+="tostring(GET_MSECS("+str(value)+"))"elifdtypeisotp.nsectime:ifkey=='_SYMBOL_TIME'andfix_symbol_time:# hack to support passing _SYMBOL_TIME to called query as a parameter# TODO: create a dedicated interface for _SYMBOL_TIMEconvert_rule+="tostring(GET_MSECS("+str(value)+"))"else:# this matches the common way onetick converts nanoseconds to symbol parameters# yes, this is the simplest way to write this expression# TODO: think if we want this conversion to be applied to query parameters alsoconvert_rule+="tostring(GET_MSECS("+str(value)+ \
"))+'.'+SUBSTR(NSECTIME_FORMAT('%J',"+str(value)+",_TIMEZONE),3,6)"else:convert_rule+="tostring("+str(value)+")"params_list.append(convert_rule)return"+','+".join(params_list)ifparamsisNone:params={}# "symbol" parameter can contain a symbol name (string, field, operation etc),# a symbol parameter list (dict, Source, _SymbolParamSource),# or both together as a tupledef_check_and_convert_symbol(symbol):ifisinstance(symbol,_Operation):# TODO: PY-35returnTrue,f"tostring({symbol})"elifisinstance(symbol,str):returnTrue,f"'{symbol}'"eliftype(symbol)in{int,float}:# constantreturnTrue,f"tostring({symbol})"elifsymbolisNone:# this is necessary to distinguish None (which is valid value for symbol) from invalid valuesreturnTrue,Noneelse:returnFalse,Nonedef_convert_symbol_param_and_columns(symbol_param):""" We need to create two objects from a symbol param (a dict, a Source or a _SymbolParamSource): 1. Dictionary of columns to generate list of symbol parameters for the JOIN_WITH_QUERY EP 2. _SymbolParamSource object to pass to the source function if necessary """ifisinstance(symbol_param,dict):converted_symbol_param_columns=symbol_paramconverted_symbol_param=_SymbolParamSource(**{key:ott.get_object_type(column)forkey,columninsymbol_param.items()})elifisinstance(symbol_param,_Source):converted_symbol_param_columns={field_name:symbol_param[field_name]forfield_nameinsymbol_param.columns(skip_meta_fields=True).keys()}converted_symbol_param=symbol_param.to_symbol_param()elifisinstance(symbol_param,_SymbolParamSource):converted_symbol_param_columns={field_name:symbol_param[field_name]forfield_nameinsymbol_param.schema.keys()}converted_symbol_param=symbol_paramelse:converted_symbol_param_columns=Noneconverted_symbol_param=Nonereturnconverted_symbol_param_columns,converted_symbol_param# if "symbol" is tuple, we unpack itifisinstance(symbol,tuple)andlen(symbol)==2:symbol_name,symbol_param=symbolelse:# see if "symbol" contains symbol name or symbol paramsis_symbol,_=_check_and_convert_symbol(symbol)ifis_symbol:symbol_name=symbolsymbol_param={}else:symbol_name=Nonesymbol_param=symbol_,converted_symbol_name=_check_and_convert_symbol(symbol_name)# default symbol name should be this: _SYMBOL_NAME if it is not empty else _NON_EXISTING_SYMBOL_# this way we will force JWQ to substitute symbol with any symbol parameters we may have passed# otherwise (if an empty symbol name is passed to JWQ), it will not substitute either symbol name# or symbol parameters, and so symbol parameters may get lost# see BDS-263ifconverted_symbol_nameisNone:converted_symbol_name="CASE(_SYMBOL_NAME,'','_NON_EXISTING_SYMBOL',_SYMBOL_NAME)"converted_symbol_param_columns,converted_symbol_param=_convert_symbol_param_and_columns(symbol_param)ifconverted_symbol_paramisNone:# we couldn't interpret "symbols" as either symbol name or symbol parametersraiseValueError('"symbol" parameter has a wrong format! It should be a symbol name, a symbol parameter ''object (dict or Source), or a tuple containing both')# prepare temporary file# ------------------------------------ #converted_params=prepare_params(**params)ifisinstance(query,Source):sub_source=queryelse:# inspect function# -------sig=inspect.signature(query)if"symbol"insig.parameters:if"symbol"inconverted_params.keys():raiseAttributeError('"params" contains key "symbol", which is reserved for symbol parameters. ''Please, rename this parameter to another name')converted_params["symbol"]=converted_symbol_param# type: ignoresub_source=query(**converted_params)sub_source=self._process_keep_time_param(keep_time,sub_source)ifnotsub_source._is_unbound_required():sub_source+=onetick.py.sources.Empty()params_str=_columns_to_params(params,fix_symbol_time=True)symbol_params_str=_columns_to_params(converted_symbol_param_columns)columns={}columns.update(self._get_columns_with_prefix(sub_source,prefix))columns.update(self.columns(skip_meta_fields=True))res=self.copy(columns=columns)res._merge_tmp_otq(sub_source)query_name=sub_source._store_in_tmp_otq(res._tmp_otq,symbols='_NON_EXISTING_SYMBOL_',operation_suffix="join_with_query")# TODO: combine with _convert_symbol_to_string# ------------------------------------ #ifwhereisnotNoneandhow!='outer':raiseValueError('The `where` parameter can be used only for outer join')join_params=dict(otq_query=f'"THIS::{query_name}"',join_type=how.upper(),otq_query_params=params_str,symbol_params=symbol_params_str,where=str(where._make_python_way_bool_expression())ifwhereisnotNoneelse'',)start_time=kwargs.get('start_time',start)end_time=kwargs.get('end_time',end)self._fill_aux_params_for_jwq(join_params,caching,end_time,prefix,start_time,converted_symbol_name,timezone)res.sink(otq.JoinWithQuery(**join_params))res.sink(otq.Table(fields=",".join([ott.type2str(dtype)+" "+nameforname,dtypeincolumns.items()]),keep_input_fields=True,))res.sink(otq.Passthrough(fields="TIMESTAMP",drop_fields=True))returnres
@propertydefstate_vars(self)->StateVars:""" Provides access to state variables Returns ------- State Variables: Dict[str, state variable] State variables, you can access one with its name. See Also -------- | `State Variables \ <../../static/getting_started/variables_and_data_structures.html#variables-and-data-structures>`_ | **DECLARE_STATE_VARIABLES** OneTick event processor """returnself.__dict__['_state_vars']__invalid_query_name_symbols_regex=re.compile('[^a-zA-Z0-9_]')def__remove_invalid_symbols(self,s):""" Replaces symbols that cannot be put in query names with '_' """returnself.__invalid_query_name_symbols_regex.sub('_',s)defget_name(self,remove_invalid_symbols=False):""" Returns source name. If remove_invalid_symbols == True, returned name only contains symbols that can be put in query names. """ifremove_invalid_symbolsandself.__name:returnself.__remove_invalid_symbols(self.__name)else:returnself.__namedefset_name(self,new_name):""" Sets source name. Source name cannot be an empty string but can be None. """assertisinstance(new_name,str)ornew_nameisNone,"Source name must be a string or None."ifnew_nameisnotNone:assertnew_name!='',"Source name must be a non-empty string."self.__name=new_namedef_name_suffix(self,suffix,separator='.',remove_invalid_symbols=False):ifremove_invalid_symbols:suffix=self.__remove_invalid_symbols(suffix)separator=self.__remove_invalid_symbols(separator)name=self.get_name(remove_invalid_symbols=True)else:name=self.__namereturnf'{separator}{name}{separator}{suffix}'ifnameelsef'{separator}{suffix}'@propertydefschema(self)->Schema:""" Represents actual python data schema in the column-name -> type format. For example, could be used after the :meth:`Source.sink` to adjust the schema. Returns ------- Schema See Also -------- Source.sink Examples -------- >>> data = otp.Ticks([['X', 'Y', 'Z'], ... [ 1, 0.5, 'abc']]) >>> data['T'] = data['Time'] >>> data.schema {'X': <class 'int'>, 'Y': <class 'float'>, 'Z': <class 'str'>, 'T': <class 'onetick.py.types.nsectime'>} >>> data.schema['X'] <class 'int'> >>> data.schema['X'] = float >>> data.schema['X'] <class 'float'> >>> 'W' in data.schema False >>> data.schema['W'] = otp.nsectime >>> 'W' in data.schema True >>> data.schema['W'] <class 'onetick.py.types.nsectime'> """schema=self.columns(skip_meta_fields=True)hidden_columns={'Time':ott.nsectime,'TIMESTAMP':ott.nsectime}returnSchema(_base_source=self,_hidden_columns=hidden_columns,**schema)defset_schema(self,**kwargs):""" Set schema of the source. Note: this method affect python part only and won't make any db queries. It used to set schema after db reading/ complex query. .. deprecated:: please use the :property:`Source.schema` to access and adjust the schema. Parameters ---------- kwargs schema in the column_name=type format Examples -------- Python can't follow low level change of column, e.g. complex query or pertick script can be sink. >>> data = otp.Ticks(dict(A=[1, 2], B=["a", "b"])) >>> data.sink(otq.AddField(field='Z', value='5')) # doctest: +SKIP >>> data.columns(skip_meta_fields=True) {'A': <class 'int'>, 'B': <class 'str'>} >>> # OTdirective: snippet-name: Arrange.schema.set; >>> data.set_schema(A=int, B=str, Z=int) >>> data.columns(skip_meta_fields=True) {'A': <class 'int'>, 'B': <class 'str'>, 'Z': <class 'int'>} """self.drop_columns()forname,dtypeinkwargs.items():dtype=ott.get_source_base_type(dtype)self[(name,dtype)]def_process_keep_time_param(self,keep_time,sub_source):ifkeep_time=="TIMESTAMP":raiseValueError("TIMESTAMP is reserved OneTick name, please, specify another one.")ifkeep_timeinself.columns():raiseValueError(f"{keep_time} column is already presented.")sub_source=sub_source.copy()ifkeep_time:sub_source[keep_time]=sub_source["Time"]returnsub_sourcedef_get_columns_with_prefix(self,sub_source,prefix)->dict:sub_source_columns=sub_source.schemaifprefixisNone:prefix=""ifnotisinstance(prefix,str):raiseValueError("Only string constants are supported for now.")new_columns={prefix+name:dtypeforname,dtypeinsub_source_columns.items()}same_names=set(new_columns)&set(self.schema)ifsame_names:raiseValueError(f"After applying prefix some columns aren't unique: {', '.join(same_names)}.")returnnew_columnsdef_fill_aux_params_for_jwq(self,join_params,caching,end_time,prefix,start_time,symbol_name,timezone):ifsymbol_name:join_params["symbol_name"]=symbol_nameifprefixisnotNone:join_params["prefix_for_output_ticks"]=str(prefix)ifcaching:supported="cross_symbol","per_symbol"ifcachinginsupported:join_params["caching_scope"]=cachingelse:raiseValueError(f"Unknown value for caching param, please use None or any of {supported}.")self._fill_time_param_for_jwq(join_params,start_time,end_time,timezone)def_fill_time_param_for_jwq(self,join_params,start_time,end_time,timezone):self._process_start_or_end_of_jwq(join_params,start_time,"start_timestamp")self._process_start_or_end_of_jwq(join_params,end_time,"end_timestamp")iftimezone:join_params["timezone"]=f"'{timezone}'"# This needs to be done, but this may break some of the existing code# Need to enable later!# else:# join_params["timezone"] = f"_TIMEZONE" # this may break something, need to testdef_process_start_or_end_of_jwq(self,join_params,time,param_name):iftimeisnotNone:ifisinstance(time,(datetime,otp.dt)):join_params[f"{param_name}"]=time.timestamp()*1000elifisinstance(time,_Operation):join_params[f"{param_name}"]=str(time)else:raiseValueError(f"{param_name} should be datetime.datetime instance or OneTick expression")
[docs]@inplace_operationdeftranspose(self,inplace:bool=False,direction:Literal['rows','columns']='rows',n:Optional[int]=None,)->Optional['Source']:""" Data transposing. The main idea is joining many ticks into one or splitting one tick to many. Parameters ---------- inplace: bool, default=False if `True` method will modify current object, otherwise it will return modified copy of the object. direction: 'rows', 'columns', default='rows' - `rows`: join certain input ticks (depending on other parameters) with preceding ones. Fields of each tick will be added to the output tick and their names will be suffixed with **_K** where **K** is the positional number of tick (starting from 1) in reverse order. So fields of current tick will be suffixed with **_1**, fields of previous tick will be suffixed with **_2** and so on. - `columns`: the operation is opposite to `rows`. It splits each input tick to several output ticks. Each input tick must have fields with names suffixed with **_K** where **K** is the positional number of tick (starting from 1) in reverse order. n: Optional[int], default=None must be specified only if ``direction`` is 'rows'. Joins every **n**-th ticks with **n-1** preceding ticks. Returns ------- If ``inplace`` parameter is `True` method will return `None`, otherwise it will return modified copy of the object. See also -------- **TRANSPOSE** OneTick event processor Examples -------- Merging two ticks into one. >>> data = otp.Ticks(dict(A=[1, 2], ... B=[3, 4])) >>> data = data.transpose(direction='rows', n=2) # OTdirective: skip-snippet:; >>> otp.run(data) Time TIMESTAMP_1 A_1 B_1 TIMESTAMP_2 A_2 B_2 0 2003-12-01 00:00:00.001 2003-12-01 00:00:00.001 2 4 2003-12-01 1 3 And splitting them back into two. >>> data = data.transpose(direction='columns') # OTdirective: skip-snippet:; >>> otp.run(data) Time A B 0 2003-12-01 00:00:00.000 1 3 1 2003-12-01 00:00:00.001 2 4 """direction_map={'rows':'ROWS_TO_COLUMNS','columns':'COLUMNS_TO_ROWS'}n=''ifnisNoneelsenself.sink(otq.Transpose(direction=direction_map[direction],key_constraint_values=n))# TODO: we should change source's schema after transposingreturnself
[docs]@inplace_operationdefprocess_by_group(self,process_source_func,group_by=None,source_name=None,inplace=False)->Union['Source',Tuple['Source',...],None]:""" Groups data by ``group_by`` and run ``process_source_func`` for each group and merge outputs for every group. Note ``process_source_func`` will be converted to Onetick object and passed to query, that means that python callable will be called only once. Parameters ---------- process_source_func: callable ``process_source_func`` should take :class:`Source` apply necessary logic and return it or tuple of :class:`Source` in this case all of them should have a common root that is the input :class:`Source`. group_by: list A list of field names to group input ticks by. If group_by is None then no group_by fields are defined and logic of ``process_source_func`` is applied to all input ticks at once source_name: str A name for the source that represents all of group_by sources. Can be passed here or as a name of the inner sources; if passed by both ways, should be consistent inplace: bool If True - nothing will be returned and changes will be applied to current query otherwise chanegs query will be returned Returns ------- :class:`Source`, Tuple[:class:`Source`] or None: See also -------- **GROUP_BY** OneTick event processor Examples -------- >>> # OTdirective: snippet-name: Arrange.group.single output; >>> d = otp.Ticks(X=[1, 1, 2, 2], ... Y=[1, 2, 3, 4]) >>> >>> def func(source): ... return source.first() >>> >>> res = d.process_by_group(func, group_by=['X']) >>> otp.run(res)[["X", "Y"]] X Y 0 1 1 1 2 3 >>> d = otp.Ticks(X=[1, 1, 2, 2], ... Y=[1, 2, 1, 3]) >>> >>> def func(source): ... source['Z'] = source['X'] ... source2 = source.copy() ... source = source.first() ... source2 = source2.last() ... return source, source2 >>> # OTdirective: snippet-name: Arrange.group.multiple output; >>> res1, res2 = d.process_by_group(func, group_by=['Y']) >>> df1 = otp.run(res1) >>> df2 = otp.run(res2) >>> df1[['X', 'Y', 'Z']] X Y Z 0 1 1 1 1 1 2 1 2 2 3 2 >>> df2[['X', 'Y', 'Z']] # OTdirective: skip-snippet:; X Y Z 0 1 2 1 1 2 1 2 2 2 3 2 """ifgroup_byisNone:group_by=[]ifinplace:main_source=selfelse:main_source=self.copy()input_schema=main_source.columns(skip_meta_fields=True)forfieldingroup_by:iffieldnotininput_schema:raiseValueError(f"Group by field name {field} not present in input source schema")process_source_root=onetick.py.sources.Custom(tick_type="ANY",schema_policy="manual",**input_schema)ifsource_name:process_source_root.set_name(source_name)process_sources=process_source_func(process_source_root)ifisinstance(process_sources,Source):# returned one sourceprocess_sources=[process_sources]eliflen(process_sources)==1:# returned one source as an iterablepasselse:# returned multiple sourcesifinplace:raiseValueError("Cannot use inplace=True with multi-source processing function!")num_source=0forprocess_sourceinprocess_sources:output_schema=process_source.columns(skip_meta_fields=True)ifprocess_source.get_name():ifnotprocess_source_root.get_name():process_source_root.set_name(process_source.get_name())ifprocess_source_root.get_name()!=process_source.get_name():warnings.warn("Different strings passed as names for the root source used in "f"process_by_group: '{process_source.get_name()}' "f"and '{process_source_root.get_name()}'")# removing key fields from output schema since they will be# added by the GROUP_BY EPprocess_source.drop([fieldforfieldingroup_byiffieldinoutput_schema],inplace=True)process_source.sink(otq.Passthrough().node_name(f"OUT_{num_source}"))process_source_root.node().add_rules(process_source.node().copy_rules())main_source._merge_tmp_otq(process_source)num_source+=1query_name=process_source_root._store_in_tmp_otq(main_source._tmp_otq,operation_suffix="group_by",add_passthrough=False)process_path=f'THIS::{query_name}'num_outputs=len(process_sources)# we shouldn't set named outputs if GROUP_BY EP has only one output due to onetick behaviourifnum_outputs==1:outputs=""else:outputs=",".join([f"OUT_{i}"foriinrange(0,num_outputs)])main_source.sink(otq.GroupBy(key_fields=",".join(group_by),query_name=process_path,outputs=outputs))output_sources=[]fornum_outputinrange(0,num_outputs):ifnum_outputs==1andinplace:output_source=main_sourceelse:output_source=main_source.copy()ifnum_outputs>1:output_source.node().out_pin(f"OUT_{num_output}")# setting schema after processingoutput_schema=process_sources[num_output].columns(skip_meta_fields=True)forfieldingroup_by:output_schema[field]=input_schema[field]forfield,field_typeinoutput_schema.items():output_source.schema[field]=field_typeoutput_source=output_source[[fieldforfieldinoutput_schema]]output_source._merge_tmp_otq(main_source)output_sources.append(output_source)ifnum_outputs==1:returnoutput_sources[0]else:returntuple(output_sources)
defunite_columns(self,sep="",*,apply_str=False):""" Join values of all columns into one string The method unite all fields to one string, just like python ``join`` method. All fields should be strings, otherwise the error will be generated. To change this behavior, ``apply_str=True`` argument should be specified, in this case all fields will be converted to string type before joining. Parameters ---------- sep: str Separator between values, empty string be dafault. apply_str: bool If set every column will be converted to string during operation. False be default. Returns ------- result: column Column with str type Examples -------- >>> # OTdirective: snippet-name: Arrange.join columns as strings; >>> data = otp.Ticks(X=[1, 2, 3], A=["A", "A", "A"], B=["A", "B", "C"]) >>> data["S_ALL"] = data.unite_columns(sep=",", apply_str=True) >>> data["S"] = data[["A", "B"]].unite_columns() >>> otp.run(data)[["S", "S_ALL"]] S S_ALL 0 AA 1,A,A 1 AB 2,A,B 2 AC 3,A,C """ifapply_str:cols=(self[col].apply(str)forcolinself.schema)else:not_str=[nameforname,tinself.schema.items()ifnotare_strings(t)]ifnot_str:raiseValueError(f"All joining columns should be strings, while {', '.join(not_str)} "f"are not. Specify `apply_str=True` for automatic type conversion")else:cols=(self[col]forcolinself.schema)returnfunctools.reduce(lambdax,y:x+sep+y,cols)# Aggregations copy# we need this functions to store and collect documentation# copy_method decorator will# set docstring (will compare docstring of donor function and method docstring)# apply same signature from donor function + self# for mimic=True will apply agg function as is
[docs]@copy_method(high_tick)defhigh(self):""" Examples -------- >>> data = otp.Ticks(X=[1, 2, 3, 4], offset=[0, 1000, 1500, 3000]) >>> data = data.high(['X'], 2) # OTdirective: snippet-name: Aggregations.high tick; >>> otp.run(data) Time X 0 2003-12-01 00:00:01.500 3 1 2003-12-01 00:00:03.000 4 """pass
[docs]@copy_method(low_tick)deflow(self):""" Examples -------- >>> data = otp.Ticks(X=[1, 2, 3, 4], offset=[0, 1000, 1500, 3000]) >>> data = data.low(['X'],2) # OTdirective: snippet-name: Aggregations.low tick; >>> otp.run(data) Time X 0 2003-12-01 00:00:00 1 1 2003-12-01 00:00:01 2 """pass
[docs]@copy_method(first_tick)deffirst(self):""" Examples -------- >>> data = otp.Ticks(X=[1, 2, 3, 4]) >>> data = data.first() # OTdirective: snippet-name: Aggregations.first; >>> otp.run(data) Time X 0 2003-12-01 1 """pass
[docs]@copy_method(last_tick)deflast(self):""" Examples -------- >>> data = otp.Ticks(X=[1, 2, 3, 4], offset=[0, 1000, 1500, 3000]) >>> data = data.last() # OTdirective: snippet-name: Aggregations.last; >>> otp.run(data) Time X 0 2003-12-01 00:00:03 4 """pass
[docs]@copy_method(distinct,mimic=False)defdistinct(self,*args,**kwargs):""" Examples -------- >>> data = otp.Ticks(dict(x=[1, 3, 1, 5, 3])) >>> data = data.distinct('x') # OTdirective: snippet-name: Aggregations.distinct; >>> otp.run(data) Time x 0 2003-12-04 1 1 2003-12-04 3 2 2003-12-04 5 """if'bucket_interval_units'inkwargs:kwargs['bucket_units']=kwargs.pop('bucket_interval_units')agg=distinct(*args,**kwargs)returnagg.apply(self)
[docs]@copy_method(high_time,mimic=False)# mimic=False for backward compatibilitydefhigh_time(self,*args,**kwargs):""" Returns timestamp of tick with the highest value of input field .. deprecated:: 1.14.5 Use :py:func:`.high_time` instead See Also -------- :py:func:`.high_time` """warnings.warn(f"{self.__class__.__name__}.{inspect.currentframe().f_code.co_name} deprecated. "f"Use otp.agg.{inspect.currentframe().f_code.co_name} instead",DeprecationWarning,stacklevel=2)agg=high_time(*args,**kwargs)returnagg.apply(self,'VALUE')
[docs]@copy_method(low_time,mimic=False)# mimic=False for backward compatibilitydeflow_time(self,*args,**kwargs):""" Returns timestamp of tick with the lowest value of input field .. deprecated:: 1.14.5 Use :py:func:`.low_time` instead See Also -------- :py:func:`.low_time` """warnings.warn(f"{self.__class__.__name__}.{inspect.currentframe().f_code.co_name} deprecated. "f"Use otp.agg.{inspect.currentframe().f_code.co_name} instead",DeprecationWarning,stacklevel=2)agg=low_time(*args,**kwargs)returnagg.apply(self,'VALUE')
[docs]@inplace_operationdefadd_prefix(self,prefix,inplace=False)->Optional['Source']:""" Addes prefix to all column names. Parameters ---------- prefix : str String prefix to add to all columns. inplace : bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. Returns ------- :class:`Source` or ``None`` Examples -------- >>> data = otp.DataSource(db='SOME_DB', tick_type='TT', symbols='S1') >>> data = data.add_prefix('test_') >>> otp.run(data) Time test_X 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 2 2 2003-12-01 00:00:00.002 3 >>> data.schema {'test_X': <class 'int'>} >>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL') >>> data = data.add_prefix('test_') >>> otp.run(data, start=otp.dt(2022, 3, 1), end=otp.dt(2022, 3, 2)) Time test_PRICE test_SIZE 0 2022-03-01 00:00:00.000 1.3 100 1 2022-03-01 00:00:00.001 1.4 10 2 2022-03-01 00:00:00.002 1.4 50 >>> data.schema {'test_PRICE': <class 'float'>} >>> data = otp.Tick(X=1, XX=2) >>> data.add_prefix('X') Traceback (most recent call last): ... AttributeError: Column XX already exists, please, use another prefix """if' 'inprefix:raiseAttributeError(f'There is space in prefix: {prefix}')schema=self.schemaforcolumn_nameinschema:new_column_name=f'{prefix}{column_name}'ifnew_column_nameinself.__dict__:raiseAttributeError(f'Column {new_column_name} already exists, please, use another prefix')forcolumn_nameinschema:new_column_name=f'{prefix}{column_name}'self.__dict__[column_name].rename(new_column_name,update_parent_object=False)self.__dict__[new_column_name]=self.__dict__[column_name]delself.__dict__[column_name]self.sink(otq.RenameFieldsEp(rename_fields=f'(.*)={prefix}\\1',use_regex=True))returnself
[docs]@inplace_operationdefadd_suffix(self,suffix,inplace=False)->Optional['Source']:""" Addes suffix to all column names. Parameters ---------- suffix : str String suffix to add to all columns. inplace : bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. Returns ------- :class:`Source` or ``None`` Examples -------- >>> data = otp.DataSource(db='SOME_DB', tick_type='TT', symbols='S1') >>> data = data.add_suffix('_test') >>> otp.run(data) Time X_test 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 2 2 2003-12-01 00:00:00.002 3 >>> data.schema {'X_test': <class 'int'>} >>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL') >>> data = data.add_suffix('_test') >>> otp.run(data, start=otp.dt(2022, 3, 1), end=otp.dt(2022, 3, 2)) Time PRICE_test SIZE_test 0 2022-03-01 00:00:00.000 1.3 100 1 2022-03-01 00:00:00.001 1.4 10 2 2022-03-01 00:00:00.002 1.4 50 >>> data.schema {'PRICE_test': <class 'float'>} >>> data = otp.Tick(X=1, XX=2) >>> data.add_suffix('X') Traceback (most recent call last): ... AttributeError: Column XX already exists, please, use another suffix """if' 'insuffix:raiseAttributeError(f'There is space in suffix: {suffix}')schema=self.schemaforcolumn_nameinschema:new_column_name=f'{column_name}{suffix}'ifnew_column_nameinself.__dict__:raiseAttributeError(f'Column {new_column_name} already exists, please, use another suffix')forcolumn_nameinschema:new_column_name=f'{column_name}{suffix}'self.__dict__[column_name].rename(new_column_name,update_parent_object=False)self.__dict__[new_column_name]=self.__dict__[column_name]delself.__dict__[column_name]self.sink(otq.RenameFieldsEp(rename_fields=f'(.*)=\\1{suffix}',use_regex=True))returnself
[docs]@inplace_operationdeftime_filter(self,discard_on_match:bool=False,start_time:Union[str,int,time]=0,end_time:Union[str,int,time]=0,day_patterns:str="",timezone=utils.default,# type: ignoreend_time_tick_matches:bool=False,inplace=False)->Optional['Source']:""" Filters ticks by time. Parameters ---------- discard_on_match : bool, optional If ``True``, then ticks that match the filter will be discarded. Otherwise, only ticks that match the filter will be passed. start_time : str or int or datetime.time, optional Start time of the filter, string must be in the format ``HHMMSSmmm``. Default value is 0. end_time : str or int or datetime.time, optional End time of the filter, string must be in the format ``HHMMSSmmm``. To filter ticks for an entire day, this parameter should be set to 240000000. Default value is 0. day_patterns : list or str Pattern or list of patterns that determines days for which the ticks can be propagated. A tick can be propagated if its date matches one or more of the patterns. Three supported pattern formats are: 1. ``month.week.weekdays``, 0 month means any month, 0 week means any week, 6 week means the last week of the month for a given weekday(s), weekdays are digits for each day, 0 being Sunday. 2. ``month/day``, 0 month means any month. 3. ``year/month/day``, 0 year means any year, 0 month means any month. timezone : str, optional Timezone of the filter. Default value is ``configuration.config.tz`` or timezone set in the parameter of :py:func:`onetick.py.run`. end_time_tick_matches : bool, optional If ``True``, then the end time is inclusive. Otherwise, the end time is exclusive. inplace : bool, optional The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. Default value is ``False``. Returns ------- :class:`Source` or ``None`` Returns ``None`` if ``inplace=True``. See also -------- **TIME_FILTER** OneTick event processor Examples -------- >>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL') >>> data = data.time_filter(start_time='000000001', end_time='000000003') >>> otp.run(data, start=otp.dt(2022, 3, 1), end=otp.dt(2022, 3, 2)) Time PRICE SIZE 0 2022-03-01 00:00:00.001 1.4 10 1 2022-03-01 00:00:00.002 1.4 50 """iftimezoneisutils.default:# doesn't work without expr for some reasontimezone='expr(_TIMEZONE)'ifday_patterns:ifisinstance(day_patterns,str):day_patterns=[day_patterns]forday_patterninday_patterns:ifnotre.match(r"(^\d\d?\.[0-6].\d\d?$)|(^\d\d?\/\d\d?$)|(^\d{1,4}\/\d\d?\/\d\d?$)",day_pattern):raiseValueError(f"Invalid day pattern: {day_pattern}")ifisinstance(start_time,time):start_time=start_time.strftime('%H%M%S%f')[:-3]ifisinstance(end_time,time):end_time=end_time.strftime('%H%M%S%f')[:-3]day_patterns=",".join(day_patterns)self.sink(otq.TimeFilter(discard_on_match=discard_on_match,start_time=start_time,end_time=end_time,timezone=timezone,day_patterns=day_patterns,end_time_tick_matches=end_time_tick_matches,))returnself
[docs]@inplace_operationdefinsert_tick(self,fields=None,where=None,preserve_input_ticks=True,num_ticks_to_insert=1,insert_before=True,inplace=False)->Optional['Source']:""" Insert tick. Parameters ---------- fields: dict of str to :py:class:`onetick.py.Operation` Mapping of field names to some expressions or values. These fields in inserted ticks will be set to corresponding values or results of expressions. If field is presented in input tick, but not set in ``fields`` dict, then the value of the field will be copied from input tick to inserted tick. If parameter ``fields`` is not set at all, then values for inserted ticks' fields will be default values for fields' types from input ticks (0 for integers etc.). where: :py:class:`onetick.py.Operation` Expression to select ticks near which the new ticks will be inserted. By default, all ticks are selected. preserve_input_ticks: bool A switch controlling whether input ticks have to be preserved in output time series or not. While the former case results in fields of input ticks to be present in the output time series together with those defined by the ``fields`` parameter, the latter case results in only defined fields to be present. If a field of the input time series is defined in the ``fields`` parameter, the defined value takes precedence. num_ticks_to_insert: int Number of ticks to insert. insert_before: bool Insert tick before each input tick or after. inplace: bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. See also -------- **INSERT_TICK** OneTick event processor Returns ------- :class:`Source` or ``None`` Examples -------- Insert tick before each tick with default type values. >>> data = otp.Tick(A=1) >>> data = data.insert_tick() >>> otp.run(data) Time A 0 2003-12-01 0 1 2003-12-01 1 Insert tick before each tick with field `A` copied from input tick and field `B` set to specified value. >>> data = otp.Tick(A=1) >>> data = data.insert_tick(fields={'B': 'b'}) >>> otp.run(data) Time A B 0 2003-12-01 1 b 1 2003-12-01 1 Insert two ticks only after first tick. >>> data = otp.Ticks(A=[1, 2, 3]) >>> data = data.insert_tick(where=data['A'] == 1, ... insert_before=False, ... num_ticks_to_insert=2) >>> otp.run(data) Time A 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.000 0 2 2003-12-01 00:00:00.000 0 3 2003-12-01 00:00:00.001 2 4 2003-12-01 00:00:00.002 3 """ifnotisinstance(num_ticks_to_insert,int)ornum_ticks_to_insert<=0:raiseValueError("Parameter 'num_ticks_to_insert' must be a positive integer")ifnotpreserve_input_ticksandnotfields:raiseValueError("Parameter 'fields' must be set if 'preserve_input_ticks' is False")where=''ifwhereisNoneelsestr(where)def_default_by_type(dtype):ifissubclass(dtype,int):return0ifissubclass(dtype,float):return0.0ifissubclass(dtype,str):return''ifissubclass(dtype,ott.nsectime)orissubclass(dtype,ott.msectime):return0fields=fieldsor{}update_schema={}forfield,valueinfields.items():dtype=ott.get_object_type(value)iffieldnotinself.schema:update_schema[field]=dtypeelifdtypeisnotself.schema[field]:raiseValueError(f"Incompatible types for field '{field}': {self.schema[field]} --> {dtype}")dtype=ott.type2str(dtype)ifisinstance(value,Type):value=_default_by_type(value)value=ott.value2str(value)fields[field]=(dtype,value)fields=','.join(f'{field}{dtype}={value}'ifvalueelsef'{field}{dtype}'forfield,(dtype,value)infields.items())self.sink(otq.InsertTick(fields=fields,where=where,preserve_input_ticks=preserve_input_ticks,num_ticks_to_insert=num_ticks_to_insert,insert_before=insert_before,))ifpreserve_input_ticks:self.schema.update(**update_schema)else:self.schema.set(**update_schema)returnself
[docs]@inplace_operationdefmodify_query_times(self,start=None,end=None,output_timestamp=None,propagate_heartbeats=True,inplace=False):""" Modify ``start`` and ``end`` time of the query. * query times are changed for all operations only **before** this method up to the source of the graph. * all ticks' timestamps produced by this method **must** fall into original time range of the query. It is possible to change ticks' timestamps with parameter ``output_timestamp``, so they will stay inside the original time range. Parameters ---------- start: :py:class:`onetick.py.datetime` or \ :py:class:`~onetick.py.core.source.MetaFields` or :py:class:`~onetick.py.Operation` Expression to replace query start time. By default, start time is not changed. Note that expression in this parameter can't depend on ticks, thus only :py:class:`~onetick.py.core.source.MetaFields` and constants can be used. end: :py:class:`onetick.py.datetime` or \ :py:class:`~onetick.py.core.source.MetaFields` or :py:class:`~onetick.py.Operation` Expression to replace query end time. By default, end time is not changed. Note that expression in this parameter can't depend on ticks, thus only :py:class:`~onetick.py.core.source.MetaFields` and constants can be used. output_timestamp: :py:class:`onetick.py.Operation` Expression that produces timestamp for each tick. By default, the following expression is used: ``orig_start + orig_timestamp - start`` This expression covers cases when start time of the query is changed and keeps timestamp inside original time range. Note that it doesn't cover cases, for example, if end time was increased, you have to handle such cases yourself. propagate_heartbeats: bool Controls heartbeat propagation. inplace: bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. See also -------- | **MODIFY_QUERY_TIMES** OneTick event processor | :py:meth:`onetick.py.Source.time_interval_shift` Returns ------- :class:`Source` or ``None`` Examples -------- >>> start = otp.dt(2022, 3, 2) >>> end = otp.dt(2022, 3, 2) + otp.Milli(3) >>> data = otp.DataSource('NYSE_TAQ', symbols='AAPL', tick_type='TRD') By default, method does nothing: >>> t = data.modify_query_times() >>> otp.run(t, start=start, end=end) Time PRICE SIZE 0 2022-03-02 00:00:00.000 1.0 100 1 2022-03-02 00:00:00.001 1.1 101 2 2022-03-02 00:00:00.002 1.2 102 See how ``_START_TIME`` and ``_END_TIME`` meta fields are changed. They are changed *before* ``modify_query_times``: >>> t = data.copy() >>> t['S_BEFORE'] = t['_START_TIME'] >>> t['E_BEFORE'] = t['_END_TIME'] >>> t = t.modify_query_times(start=t['_START_TIME'] + otp.Milli(1), ... end=t['_END_TIME'] - otp.Milli(1)) >>> t['S_AFTER'] = t['_START_TIME'] >>> t['E_AFTER'] = t['_END_TIME'] >>> otp.run(t, start=start, end=end) Time PRICE SIZE S_BEFORE E_BEFORE S_AFTER E_AFTER 0 2022-03-02 1.1 101 2022-03-02 00:00:00.001 2022-03-02 00:00:00.002 2022-03-02 2022-03-02 00:00:00.003 You can decrease time interval without problems: >>> t = data.modify_query_times(start=data['_START_TIME'] + otp.Milli(1), ... end=data['_END_TIME'] - otp.Milli(1)) >>> otp.run(t, start=start, end=end) Time PRICE SIZE 0 2022-03-02 1.1 101 Note that the timestamp of the tick was changed with default expression. In this case we can output original timestamps, because they fall into original time range: >>> t = data.modify_query_times(start=data['_START_TIME'] + otp.Milli(1), ... end=data['_END_TIME'] - otp.Milli(1), ... output_timestamp=data['TIMESTAMP']) >>> otp.run(t, start=start, end=end) Time PRICE SIZE 0 2022-03-02 00:00:00.001 1.1 101 But it will not work if new time range is wider than original: >>> t = data.modify_query_times(start=data['_START_TIME'] - otp.Milli(1), ... output_timestamp=data['TIMESTAMP']) >>> otp.run(t, start=start + otp.Milli(1), end=end + otp.Milli(1)) # doctest: +ELLIPSIS Traceback (most recent call last): Exception...timestamp is falling out of initial start/end time range... In this case default ``output_timestamp`` expression would work just fine: >>> t = data.modify_query_times(start=data['_START_TIME'] - otp.Milli(1)) >>> otp.run(t, start=start + otp.Milli(1), end=end + otp.Milli(1)) Time PRICE SIZE 0 2022-03-02 00:00:00.001 1.0 100 1 2022-03-02 00:00:00.002 1.1 101 2 2022-03-02 00:00:00.003 1.2 102 But it doesn't work, for example, if end time has crossed the borders of original time range. In this case other ``output_timestamp`` expression must be specified: >>> t = data.modify_query_times( ... start=data['_START_TIME'] - otp.Milli(2), ... output_timestamp=otp.math.min(data['TIMESTAMP'] + otp.Milli(2), data['_END_TIME']) ... ) >>> otp.run(t, start=start + otp.Milli(2), end=end) Time PRICE SIZE 0 2022-03-02 00:00:00.002 1.0 100 1 2022-03-02 00:00:00.003 1.1 101 2 2022-03-02 00:00:00.003 1.2 102 Remember that ``start`` and ``end`` parameters can't depend on ticks: >>> t = data.copy() >>> t['X'] = 12345 >>> t = t.modify_query_times(start=t['_START_TIME'] + t['X'] - t['X'], ... end=t['_END_TIME'] - otp.Milli(1)) >>> otp.run(t, start=start, end=end) # doctest: +ELLIPSIS Traceback (most recent call last): Exception...parameter must not depend on ticks... Constant datetime values can be used as parameters too: >>> t = data.modify_query_times(start=start + otp.Milli(1), ... end=end - otp.Milli(1)) >>> otp.run(t, start=start, end=end) Time PRICE SIZE 0 2022-03-02 1.1 101 Note that some graph patterns are not allowed when using this method. For example, modifying query times for a branch that will be merged later: >>> t1, t2 = data[data['PRICE'] > 1.3] >>> t2 = t2.modify_query_times(start=start + otp.Milli(1)) >>> t = otp.merge([t1, t2]) >>> otp.run(t, start=start, end=end) # doctest: +ELLIPSIS Traceback (most recent call last): Exception...Invalid graph...time bound to a node...an intermediate node in one of the cycles in graph... """start=ott.value2str(start)ifstartisnotNoneelse''end=ott.value2str(end)ifendisnotNoneelse''output_timestamp=ott.value2str(output_timestamp)ifoutput_timestampisnotNoneelse''self.sink(otq.ModifyQueryTimes(start_time=start,end_time=end,output_timestamp=output_timestamp,propagate_heartbeats=propagate_heartbeats,))returnself
[docs]deftime_interval_shift(self,shift,inplace=False):""" Shifting time interval for a source. The whole data flow is shifted all the way up to the source of the graph. The ticks' timestamps are changed accordingly so they fit into original time range. Parameters ---------- shift: int or :ref:`datetime offset<offsets>` Offset to shift the whole time interval. Can be positive or negative. Positive value moves time interval into the future, negative -- to the past. int values are interpreted as milliseconds. inplace: bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. Returns ------- :class:`Source` or ``None`` See also -------- | :py:meth:`onetick.py.Source.modify_query_times` | :py:meth:`onetick.py.Source.time_interval_change` Examples -------- --> Also see use-case using :py:meth:`time_interval_shift` for calculating `Markouts <../../static/getting_started/time_based_joins.html#use-case-computing-markouts>`_ >>> start = otp.dt(2022, 3, 2) >>> end = otp.dt(2022, 3, 2) + otp.Milli(3) >>> data = otp.DataSource('NYSE_TAQ', symbols='AAPL', tick_type='TRD') Default data: >>> otp.run(data, start=start, end=end) Time PRICE SIZE 0 2022-03-02 00:00:00.000 1.0 100 1 2022-03-02 00:00:00.001 1.1 101 2 2022-03-02 00:00:00.002 1.2 102 Get window for a third tick: >>> otp.run(data, start=start + otp.Milli(2), end=start + otp.Milli(3)) Time PRICE SIZE 0 2022-03-02 00:00:00.002 1.2 102 Shifting time window will result in different set of ticks, but the ticks will have their timestamps changed to fit into original time range. Let's shift time 2 milliseconds back and thus get the first tick: >>> t = data.time_interval_shift(shift=-otp.Milli(2)) >>> otp.run(t, start=start + otp.Milli(2), end=start + otp.Milli(3)) Time PRICE SIZE 0 2022-03-02 00:00:00.002 1.0 100 Here we are querying empty time interval, but shifting one second back to get ticks. >>> t = data.time_interval_shift(shift=-otp.Second(1)) >>> otp.run(t, start=start + otp.Second(1), end=end + otp.Second(1)) Time PRICE SIZE 0 2022-03-02 00:00:01.000 1.0 100 1 2022-03-02 00:00:01.001 1.1 101 2 2022-03-02 00:00:01.002 1.2 102 """start=self['_START_TIME']+shiftend=self['_END_TIME']+shift# change timestamps so they fit into original time rangeoutput_timestamp=self['TIMESTAMP']-shiftreturnself.modify_query_times(start=start,end=end,output_timestamp=output_timestamp,inplace=inplace)
[docs]deftime_interval_change(self,start_change=0,end_change=0,inplace=False):""" Changing time interval by making it bigger or smaller. All timestamps of ticks that are crossing the border of original time range will be set to original start time or end time depending on their original time. Parameters ---------- start_change: int or :ref:`datetime offset<offsets>` Offset to shift start time. Can be positive or negative. Positive value moves start time into the future, negative -- to the past. int values are interpreted as milliseconds. end_change: int or :ref:`datetime offset<offsets>` Offset to shift end time. Can be positive or negative. Positive value moves end time into the future, negative -- to the past. int values are interpreted as milliseconds. inplace: bool The flag controls whether operation should be applied inplace or not. If ``inplace=True``, then it returns nothing. Otherwise method returns a new modified object. Returns ------- :class:`Source` or ``None`` See also -------- | :py:meth:`onetick.py.Source.modify_query_times` | :py:meth:`onetick.py.Source.time_interval_shift` Examples -------- >>> start = otp.dt(2022, 3, 2) >>> end = otp.dt(2022, 3, 2) + otp.Milli(3) >>> data = otp.DataSource('NYSE_TAQ', symbols='AAPL', tick_type='TRD') By default, ``time_interval_change()`` does nothing: >>> t = data.time_interval_change() >>> otp.run(t, start=start, end=end) Time PRICE SIZE 0 2022-03-02 00:00:00.000 1.0 100 1 2022-03-02 00:00:00.001 1.1 101 2 2022-03-02 00:00:00.002 1.2 102 Decreasing time range will not change ticks' timestamps: >>> t = data.time_interval_change(start_change=otp.Milli(1), end_change=-otp.Milli(1)) >>> otp.run(t, start=start, end=end) Time PRICE SIZE 0 2022-03-02 00:00:00.001 1.1 101 Increasing time range will change timestamps of the ticks that crossed the border. In this case first tick's timestamp will be set to original start time, and third tick's to original end time. >>> t = data.time_interval_change(start_change=-otp.Milli(1), end_change=otp.Milli(1)) >>> otp.run(t, start=start + otp.Milli(1), end=start + otp.Milli(2)) Time PRICE SIZE 0 2022-03-02 00:00:00.001 1.0 100 1 2022-03-02 00:00:00.001 1.1 101 2 2022-03-02 00:00:00.002 1.2 102 Here we are querying empty time interval, but changing start time one second back to get ticks. >>> t = data.time_interval_change(start_change=-otp.Second(1)) >>> otp.run(t, start=start + otp.Second(1), end=end + otp.Second(1)) Time PRICE SIZE 0 2022-03-02 00:00:01 1.0 100 1 2022-03-02 00:00:01 1.1 101 2 2022-03-02 00:00:01 1.2 102 """start=self['_START_TIME']+start_changeend=self['_END_TIME']+end_change# change ticks' timestamps only if they are out of boundsoutput_timestamp=self['TIMESTAMP']output_timestamp=otp.math.min(output_timestamp,self['_END_TIME'])output_timestamp=otp.math.max(output_timestamp,self['_START_TIME'])returnself.modify_query_times(start=start,end=end,output_timestamp=output_timestamp,inplace=inplace)