importdatetimeasdtimportinspectimportoperatorimportosimportsysimportwarningsimportioimportmathfromfunctoolsimportpartialfromtypingimportOptional,Union,Type,Iterableimportonetick.pyasotpimportonetick.queryasotqimportpandasaspdimportonetick.py.core._sourceimportonetick.py.functionsimportonetick.py.db._inspectionfromonetick.py.core._internal._param_columnimport_ParamColumnfromonetick.py.core._source._symbol_param_columnimport_SymbolParamColumnfromonetick.py.core._source.tmp_otqimportTmpOtqfromonetick.py.core.columnimport_Columnfromonetick.py.core.eval_queryimport_QueryEvalWrapperfromonetick.py.core.sourceimportSource,_Source# _Source for backward compatibilityfrom.importtypesasottfrom.importutils,configurationfrom.coreimport_csv_inspector,query_inspectorfrom.core.column_operations._methods.methodsimportis_arithmeticalfrom.core.column_operations.baseimport_Operationfrom.db.dbimportDBfrom.db._inspectionimportDBasinspect_DBfrom.aggregations._docsimportdocstring,_param_docfrom.aggregations.order_bookimport(OB_SNAPSHOT_DOC_PARAMS,OB_SNAPSHOT_WIDE_DOC_PARAMS,OB_SNAPSHOT_FLAT_DOC_PARAMS)_QUERY_PARAM_SPECIAL_CHARACTERS="=,"AdaptiveTickType=Union[str,Type[utils.adaptive]]defupdate_node_tick_type(node:"Source",tick_type:AdaptiveTickType,db:Optional[str]=None):"""Update node tick_type according to db name and tick_type. Don't change tick type for adaptive tick type. Parameters ---------- node: Source node to set tick_type on tick_type: AdaptiveTickType string tick type or :py:class:`onetick.py.adaptive` db: Optional[str] optional db name """# do not change tick type for adaptive `tick_type`ifnotisinstance(tick_type,type)andtick_typeisnotutils.adaptive:ifdb:node.tick_type(db+"::"+tick_type)else:node.tick_type(tick_type)classTick(Source):""" Generate single tick object Parameters ---------- offset: int, default=0 tick timestamp offset from query start time in `offset_part` offset_part: one of [nanosecond, millisecond, second, minute, hour, day, dayofyear, weekday, week, month, quarter, year], default=millisecond #noqa offset union symbol: data symbol db: data database start: start time for tick end: end time for tick tick_type: AdaptiveTickType Special tick_type `TICK_GENERATOR` will be used by default. You can use :py:class:`onetick.py.adaptive` for the value if you want to use sink node tick type instead of defining your own. bucket_time: ??? bucket_interval: ??? kwargs See also -------- **TICK_GENERATOR** OneTick event processor """def__init__(self,offset=0,offset_part='millisecond',time:ott.datetime=None,timezone_for_time=None,symbol=utils.adaptive_to_default,db=utils.adaptive_to_default,start=utils.adaptive,end=utils.adaptive,tick_type:Optional[AdaptiveTickType]=None,bucket_time:str="start",bucket_interval:int=0,**kwargs,):ifself._try_default_constructor(**kwargs):returniflen(kwargs)==0:raiseValueError("It is not allowed to have a tick without fields")iftimeisnotNoneandoffset!=0:raiseValueError("It's not allowed to set parameter 'datetime' and set non-zero offset at the same time")bucket_time=self._get_bucket_time(bucket_time)iftick_typeisNone:tick_type="TICK_GENERATOR"columns={}forkey,valueinkwargs.items():# the way to skip a fieldifvalueisNone:continueifinspect.isclass(value):raiseTypeError(f"Tick constructor expects values but not types, {value}")else:value_type=ott.get_object_type(value)ifvalue_typeisstr:ifisinstance(value,_Column)oris_arithmetical(value):ifvalue.dtypeisnotstr:value_type=value.dtypeeliflen(value)>ott.string.DEFAULT_LENGTH:value_type=ott.string[len(value)]ifvalue_typeisbool:value_type=floatifissubclass(value_type,(ott.datetime,ott.date,dt.datetime,dt.date,pd.Timestamp)):value_type=ott.nsectimecolumns[key]=value_typesuper().__init__(_symbols=symbol,_start=start,_end=end,_base_ep_func=lambda:self.base_ep(db=db,tick_type=tick_type,offset=offset,offset_part=offset_part,time=time,timezone_for_time=timezone_for_time,columns=columns,bucket_time=bucket_time,bucket_interval=bucket_interval,**kwargs),**columns,)defbase_ep(self,db=utils.adaptive_to_default,tick_type="TICK_GENERATOR",offset=0,offset_part='millisecond',time=None,timezone_for_time=None,columns=None,bucket_time="start",bucket_interval=0,**kwargs):ifcolumnsisNone:columns={}ifdbisutils.adaptive_to_default:# if default database is not set, tick type will be set without it# and symbols will have to be specified in otp.rundb=configuration.config.get('default_db')params=",".join(ott.type2str(columns[key])+" "+str(key)+"="+ott.value2str(value)forkey,valueinkwargs.items()ifvalueisnotNone)src=Source(otq.TickGenerator(bucket_interval=bucket_interval,bucket_time=bucket_time,fields=params),**columns)update_node_tick_type(src,tick_type,db)# TIMESTAMP += offset will add redundant nodes to sort the timestamps.# No sorting needed for a single tick.ifoffset:src.sink(otq.UpdateField(field="TIMESTAMP",value=f"dateadd('{offset_part}', {offset}, TIMESTAMP, _TIMEZONE)"))eliftime:src.sink(otq.UpdateField(field="TIMESTAMP",value=ott.datetime2expr(time,timezone_naive=timezone_for_time)))returnsrc@staticmethoddef_get_bucket_time(bucket_time):ifbucket_time=="BUCKET_START":warnings.warn("BUCKET_START value is deprecated. Please, use 'start' instead",DeprecationWarning)elifbucket_time=="BUCKET_END":warnings.warn("BUCKET_END value is deprecated. Please, use 'end' instead",DeprecationWarning)elifbucket_time=="start":bucket_time="BUCKET_START"elifbucket_time=="end":bucket_time="BUCKET_END"else:raiseValueError(f"Only 'start' and 'end' values supported as bucket time, but you've passed {bucket_time}")returnbucket_time
[docs]defTicks(data=None,symbol=utils.adaptive_to_default,db=utils.adaptive_to_default,start=utils.adaptive,end=utils.adaptive,tick_type:Optional[AdaptiveTickType]=None,timezone_for_time=None,**inplace_data):""" Data source that generates ticks. Ticks are placed with the 1 millisecond offset from each other starting from the start of the query interval. It has ability to change `distance` between ticks using the special reserved field name ``offset``, that specify time offset from a previous tick. Parameters ---------- data: dict, list or pandas.DataFrame, optional Ticks values * ``dict`` -- <field_name>: <values> * ``list`` -- [[<field_names>], [<first_tick_values>], ..., [<n_tick_values>]] * :pandas:`DataFrame <pandas.DataFrame>` -- DataFrame with ``Time`` column * ``None`` -- ``inplace_data`` will be used symbol: str, list of str, :class:`Source`, :class:`query`, :py:func:`eval query <onetick.py.eval>` Symbol(s) from which data should be taken. db: str Database to use for tick generation start, end: :py:class:`datetime.datetime`, :py:class:`otp.datetime <onetick.py.datetime>`, \ :py:class:`onetick.py.adaptive` Timestamp for data generation tick_type: str tick type for data generation timezone_for_time: str timezone for data generation **inplace_data: dict <field_name>: list(<field_values>) See also -------- **TICK_GENERATOR** OneTick event processor Examples -------- Pass data in ``dict`` >>> d = otp.Ticks({'A': [1, 2, 3], 'B': [4, 5, 6]}) >>> otp.run(d) Time A B 0 2003-12-01 00:00:00.000 1 4 1 2003-12-01 00:00:00.001 2 5 2 2003-12-01 00:00:00.002 3 6 Pass ``inplace_data`` >>> d = otp.Ticks(A=[1, 2, 3], B=[4, 5, 6]) >>> otp.run(d) Time A B 0 2003-12-01 00:00:00.000 1 4 1 2003-12-01 00:00:00.001 2 5 2 2003-12-01 00:00:00.002 3 6 Pass data in ``list`` >>> d = otp.Ticks([['A', 'B'], ... [1, 4], ... [2, 5], ... [3, 6]]) >>> otp.run(d) Time A B 0 2003-12-01 00:00:00.000 1 4 1 2003-12-01 00:00:00.001 2 5 2 2003-12-01 00:00:00.002 3 6 Using the ``offset`` example >>> data = otp.Ticks(X=[1, 2, 3], offset=[0, otp.Nano(1), 1]) >>> otp.run(data) Time X 0 2003-12-01 00:00:00.000000000 1 1 2003-12-01 00:00:00.000000001 2 2 2003-12-01 00:00:00.001000000 3 Using pandas.DataFrame >>> start_datetime = datetime(2023, 1, 1, 12) >>> time_array = [start_datetime + otp.Hour(1) + otp.Nano(1)] >>> a_array = [start_datetime - otp.Day(15) - otp.Nano(7)] >>> df = pd.DataFrame({'Time': time_array,'A': a_array}) >>> data = otp.Ticks(df) >>> otp.run(data, start=start_datetime, end=start_datetime + otp.Day(1)) Time A 0 2023-01-01 13:00:00.000000001 2022-12-17 11:59:59.999999993 """iftick_typeisNone:tick_type="TICK_GENERATOR"ifdbisutils.adaptive_to_default:db=configuration.config.get('default_db')ifisinstance(data,pd.DataFrame):if'Time'notindata.columns:raiseValueError('Field `Time` is required for constructing an `otp.Source` from `pandas.DataFrame`')data=data.rename(columns={"Time":"time"})data=data.to_dict('list')ifdataandlen(inplace_data)!=0:raiseValueError("Data can be passed only using either the `data` parameter ""or inplace through the key-value args")ifisinstance(data,list):reform={}forinx,keyinenumerate(data[0]):reform[key]=[sub_list[inx]forsub_listindata[1:]]data=reformifdataisNone:ifinplace_data:data=inplace_dataelse:raiseValueError("You don't specify any date to create ticks from. ""Please, use otp.Empty for creating empty data source")else:data=data.copy()value_len=-1forkey,valueindata.items():ifvalue_len==-1:value_len=len(value)else:ifvalue_len!=len(value):# TODO: write test to cover that caseraiseValueError(f"It is not allowed to have different columns of different lengths, "f"some of columns have {value_len} length, but column '{key}', as instance, has {len(value)}")use_absolute_time=Falseif"offset"indata:if"time"indata:raiseValueError("You cannot specify offset and time at the same time")else:if"time"indata:use_absolute_time=Trueelse:data["offset"]=list(range(value_len))ifnotuse_absolute_time:offset_values=[]offset_parts=[]forofvindata['offset']:ifisinstance(ofv,ott.offsets.Tick):offset_values.append(ofv.n)offset_parts.append(str(ofv.datepart)[1:-1])else:offset_values.append(ofv)offset_parts.append('millisecond')data['offset']=offset_valuesdata['offset_part']=offset_partsifvalue_len==1:columns={key:value[0]forkey,valueindata.items()}returnTick(db=db,symbol=symbol,tick_type=tick_type,start=start,end=end,timezone_for_time=timezone_for_time,**columns)else:# select only columns that do not contain None there to support# heterogeneous datanot_none_columns=[]forkeyindata.keys():data[key]=[float(elem)ifisinstance(elem,bool)elseelemforelemindata[key]]forkey,valueindata.items():add=Trueforvinvalue:# we need it, because can't use _Column instances in if-clausesifisinstance(v,_Column):continueifvisNone:add=Falsebreakifadd:not_none_columns.append(key)# if a field depends on a symbol parameter, it cannot be csv'd (it's dynamic)# likewise for otq parameters# if there's a better way to check whether a value is constant,# will be glad to hear about itis_outside_data_dependent=Falseforkey,valueindata.items():forvinvalue:str_rep=str(v)if("_SYMBOL_NAME"instr_rep)or("_SYMBOL_PARAM"instr_rep)or("$"instr_rep):is_outside_data_dependent=Truebreak# infinity() and (on windows) nan() cannot be natively read from a csvhas_special_values=Falseforkey,valueindata.items():forvinvalue:ifisinstance(v,ott._inf)or \
(isinstance(v,ott._nan)orisinstance(v,float)andmath.isnan(v)) \
andsys.platform.startswith("win"):has_special_values=Truebreakif(len(not_none_columns)==len(data))and(notis_outside_data_dependent)and(nothas_special_values):# Data is homogenous; CSV backing can be usedreturn_DataCSV(data,value_len,db=db,symbol=symbol,tick_type=tick_type,start=start,end=end,timezone_for_time=timezone_for_time,use_absolute_time=use_absolute_time)else:# Fallback is a merge of individual ticksticks=[]forinxinrange(value_len):columns={key:value[inx]forkey,valueindata.items()}ticks.append(Tick(db=db,symbol=symbol,tick_type=tick_type,start=start,end=end,timezone_for_time=timezone_for_time,**columns))returnonetick.py.functions.merge(ticks,align_schema=not_none_columns)
class_DataCSV(Source):def__init__(self,data=None,length=None,db=utils.adaptive_to_default,symbol=utils.adaptive_to_default,tick_type=None,start=utils.adaptive,end=utils.adaptive,use_absolute_time=False,timezone_for_time=None,**kwargs,):ifself._try_default_constructor(**kwargs):returnifdataisNoneorlengthisNone:raiseValueError("'data' and 'length' parameters can't be None")ifdbisutils.adaptive_to_default:db=configuration.config.get('default_db')defdatetime_to_expr(v):ifott.is_time_type(v):returnott.datetime2expr(v,timezone_naive=timezone_for_time)ifisinstance(v,ott.nsectime):# TODO: change to ott.value2str after PY-441returnf'NSECTIME({v})'ifisinstance(v,ott.msectime):returnott.value2str(v)raiseValueError(f"Can't convert value {v} to datetime expression")ifuse_absolute_time:# converting values of "time" column to onetick expressionsconverted_times=[]fordindata["time"]:converted_times.append(datetime_to_expr(d))data["time"]=converted_timesdefcsv_rep(value):ifissubclass(type(value),str):return'"'+value.replace("\\","\\\\").replace('"','\\"')+'"'else:returnstr(value)defget_type_of_column(key):defget_type_of_value(value):t=ott.get_object_type(value)ifott.is_time_type(t):returnott.nsectimeeliftisstr:iflen(value)<=ott.string.DEFAULT_LENGTH:returnstrelse:returnott.string[len(value)]else:returnttypes=[get_type_of_value(v)forvindata[key]]res,_=utils.get_type_that_includes(types)returnrescolumns={key:get_type_of_column(key)forkeyindata}expression_columns=[]header_columns={}forkeyinlist(columns):header_columns[key]=columns[key]# converting values of datetime columns to onetick expressionsifcolumns[key]isott.nsectime:data[key]=[datetime_to_expr(v)forvindata[key]]header_columns[key]=get_type_of_column(key)expression_columns.append(key)transposed_data=[[csv_rep(value[i])forkey,valueindata.items()]foriinrange(length)]text_header=",".join(f"{ott.type2str(v)}{k}"fork,vinheader_columns.items())text_data="\n".join([",".join(data_row)fordata_rowintransposed_data])ifuse_absolute_time:delcolumns["time"]else:delcolumns["offset"]delcolumns["offset_part"]super().__init__(_symbols=symbol,_start=start,_end=end,_base_ep_func=lambda:self.base_ep(columns=columns,db=db,tick_type=tick_type,use_absolute_time=use_absolute_time,text_header=text_header,text_data=text_data,expression_columns=expression_columns),**columns,)defbase_ep(self,columns,db,tick_type,use_absolute_time,text_header,text_data,expression_columns=None):node=Source(otq.CsvFileListing(discard_timestamp_column=True,time_assignment="_START_TIME",field_delimiters="','",quote_chars='"""',handle_escaped_chars=True,file_contents=text_data,first_line_is_title=False,fields=text_header,),**columns,)update_node_tick_type(node,tick_type,db)ifuse_absolute_time:# don't trust UpdateFieldnode.sink(otq.AddField(field='____TMP____',value="EVAL_EXPRESSION(time, 'datetime')"))node.sink(otq.UpdateField(field="TIMESTAMP",value="____TMP____"))node.sink(otq.Passthrough(fields="time,____TMP____",drop_fields="True"))node.sink(otq.OrderBy(order_by="TIMESTAMP ASC"))else:node.sink(otq.OrderBy(order_by="offset ASC"))node.sink(otq.UpdateField(field="TIMESTAMP",value="dateadd(offset_part, offset, TIMESTAMP, _TIMEZONE)"))node.sink(otq.Passthrough(fields="offset,offset_part",drop_fields="True"))node.sink(otq.OrderBy(order_by="TIMESTAMP ASC"))forcolumninexpression_columnsor[]:# don't trust UpdateFieldnode.sink(otq.RenameFields(f'{column}=____TMP____'))node.sink(otq.AddField(field=column,value="EVAL_EXPRESSION(____TMP____, 'datetime')"))node.sink(otq.Passthrough(fields='____TMP____',drop_fields=True))node.sink(otq.Table(keep_input_fields=True,fields=', '.join(f'nsectime {column}'forcolumninexpression_columns)))returnnodedefTTicks(data):""" .. deprecated:: 1.3.101 Transposed Ticks format. Parameters ---------- data: list list of list, where the first sublist is the header, and other are values """warnings.warn("The nice and helpful function `TTicks` is going to be deprecated. ""You could use the `Ticks` to pass data in the same format there",DeprecationWarning)dt={}forinx,keyinenumerate(data[0]):dt[key]=[sub_list[inx]forsub_listindata[1:]]returnTicks(dt)
[docs]classEmpty(Source):""" Empty data source Parameters ---------- db: str Name of the database from which to take schema. symbol: str, list of str, :class:`Source`, :class:`query`, :py:func:`eval query <onetick.py.eval>` Symbol(s) from which data should be taken. tick_type: str, Name of the tick_type from which to take schema. start, end: :py:class:`datetime.datetime`, :py:class:`otp.datetime <onetick.py.datetime>`, \ :py:class:`onetick.py.adaptive` Time interval from which the data should be taken. schema: schema to use in case db and/or tick_type are not set Examples -------- We can define schema: >>> data = otp.Empty(A=str, B=int) >>> otp.run(data) Empty DataFrame Columns: [] Index: [] >>> data.columns() {'A': <class 'str'>, 'B': <class 'int'>, 'TIMESTAMP': <class 'onetick.py.types.nsectime'>, '_START_TIME': <class 'onetick.py.types.nsectime'>, '_END_TIME': <class 'onetick.py.types.nsectime'>, '_SYMBOL_NAME': <class 'str'>, '_DBNAME': <class 'str'>, '_TICK_TYPE': <class 'str'>, '_TIMEZONE': <class 'str'>} Or we can get schema from the database: >>> data = otp.Empty(db='SOME_DB', tick_type='TT') >>> data.columns() {'X': <class 'int'>, 'TIMESTAMP': <class 'onetick.py.types.nsectime'>, '_START_TIME': <class 'onetick.py.types.nsectime'>, '_END_TIME': <class 'onetick.py.types.nsectime'>, '_SYMBOL_NAME': <class 'str'>, '_DBNAME': <class 'str'>, '_TICK_TYPE': <class 'str'>, '_TIMEZONE': <class 'str'>} """def__init__(self,db=utils.adaptive_to_default,symbol=utils.adaptive_to_default,tick_type=None,start=utils.adaptive,end=utils.adaptive,**schema,):ifself._try_default_constructor(**schema):returncolumns={}iftick_typeanddb!=configuration.config.get('default_db')anddbisnotutils.adaptive_to_default:try:db_obj=onetick.py.db._inspection.DB(db)params={'tick_type':tick_type}ifendisnotutils.adaptive:params['end']=endcolumns=db_obj.schema(**params)exceptException:pass# do not raise an exception if no data found, because it is empty _source and does not matterelse:columns=schemasuper().__init__(_symbols=symbol,_start=start,_end=end,_base_ep_func=lambda:self.base_ep(db),**columns)defbase_ep(self,db):ifdbisutils.adaptive_to_default:db=configuration.config.get('default_db')src=Source(otq.TickGenerator(fields="long ___NOTHING___=0"))ifdbisNone:src.tick_type('TICK_GENERATOR')else:src.tick_type(db+"::TICK_GENERATOR")returnsrc
[docs]classCSV(Source):""" Construct source based on CSV file. There are several steps determining column types. 1. Initially, all column treated as ``str``. 2. If column name in CSV title have format ``type COLUMNNAME``, it will change type from ``str`` to specified type. 3. All column type are determined automatically from its data. 4. You could override determined types in ``dtype`` argument explicitly. 5. ``converters`` argument is applied after ``dtype`` and could also change column type. Parameters ---------- filepath_or_buffer: str, os.PathLike, FileBuffer Path to CSV file or :class:`file buffer <FileBuffer>` timestamp_name: str, default "Time" Name of TIMESTAMP column used for ticks. Used only if it is exists in CSV columns, otherwise ignored. first_line_is_title: bool Use first line of CSV file as a source for column names and types. If CSV file is started with # symbol, this parameter **must** be ``True``. - If ``True``, column names are inferred from the first line of the file, it is not allowed to have empty name for any column. - If ``False``, first line is processed as data, column names will be COLUMN_1, ..., COLUMN_N. You could specify column names in ``names`` argument. names: list, optional List of column names to use, or None. Length must be equal to columns number in file. Duplicates in this list are not allowed. dtype: dict, optional Data type for columns, as dict of pais {column_name: type}. Will convert column type from ``str`` to specified type, before applying converters. converters: dict, optional Dict of functions for converting values in certain columns. Keys are column names. Function must be valid callable with ``onetick.py`` syntax, example:: converters={ "time_number": lambda c: c.apply(otp.nsectime), "stock": lambda c: c.str.lower(), } Converters applied *after* ``dtype`` conversion. order_ticks: bool, optional If ``True`` and ``timestamp_name`` column are used, then source will order tick by time. Note, that if ``False`` and ticks are not ordered in sequence, then OneTick will raise Exception in runtime. drop_index: bool, optional if ``True`` and 'Index' column is in the csv file then this column will be removed. change_date_to: datetime, date, optional change date from a timestamp column to a specific date. Default is None, means not changing timestamp column. See also -------- **CSV_FILE_LISTING** OneTick event processor Examples -------- Simple CSV file reading >>> data = otp.CSV(os.path.join(csv_path, "data.csv")) >>> otp.run(data) Time time_number px side 0 2003-12-01 00:00:00.000 1656690986953602304 30.89 Buy 1 2003-12-01 00:00:00.001 1656667706281508352 682.88 Buy Read CSV file and get timestamp for ticks from specific field. You need to specify query start/end interval including all ticks. >>> data = otp.CSV(os.path.join(csv_path, "data.csv"), ... timestamp_name="time_number", ... converters={"time_number": lambda c: c.apply(otp.nsectime)}, ... start=otp.dt(2010, 8, 1), ... end=otp.dt(2022, 9, 2)) >>> otp.run(data) Time px side 0 2022-07-01 11:56:26.953602304 30.89 Buy 1 2022-07-01 05:28:26.281508352 682.88 Buy """def__init__(self,filepath_or_buffer=None,timestamp_name:Union[str,None]="Time",first_line_is_title:bool=True,names:Union[list,None]=None,dtype:dict={},converters:dict={},order_ticks=False,drop_index=True,change_date_to=None,**kwargs):ifself._try_default_constructor(**kwargs):returnobj_to_inspect=filepath_or_bufferifisinstance(filepath_or_buffer,utils.FileBuffer):obj_to_inspect=io.StringIO(filepath_or_buffer.get())ifisinstance(obj_to_inspect,str)andnotos.path.exists(obj_to_inspect):# if not found, probably, CSV file is located in OneTick CSV_FILE_PATH, check it for inspect_by_pandas()csv_paths=otp.utils.get_config_param(os.environ["ONE_TICK_CONFIG"],"CSV_FILE_PATH",default="")ifcsv_paths:forcsv_pathincsv_paths.split(","):csv_path=os.path.join(csv_path,obj_to_inspect)ifos.path.exists(csv_path):obj_to_inspect=csv_pathbreakcolumns,default_types,forced_title=_csv_inspector.inspect_by_pandas(obj_to_inspect,first_line_is_title,names)if"TIMESTAMP"incolumns:raiseValueError("It is not allowed to have 'TIMESTAMP' columns, because it is reserved name in OneTick")if"Time"incolumnsandtimestamp_name!="Time":raiseValueError("It is not allowed to have 'Time' column not used as timestamp field.")ep_fields=",".join(f'{ott.type2str(dtype)}{column}'ifissubclass(dtype,otp.string)elsecolumnforcolumn,dtypeincolumns.items())to_drop=[]if"TICK_STATUS"incolumns:delcolumns["TICK_STATUS"]to_drop.append("TICK_STATUS")if"Index"incolumnsanddrop_index:delcolumns["Index"]to_drop.append("Index")# determine start and end datesstart=kwargs.get("start",utils.adaptive)end=kwargs.get("end",utils.adaptive)fortindtype:iftnotincolumns:raiseValueError(f"dtype '{t}' not found in columns list")columns[t]=dtype[t]has_time=Falseiftimestamp_nameincolumns:has_time=True# remove to resolve exception in Source.__init__iftimestamp_name=="Time":delcolumns["Time"]# redefine start/end time for change_date_toifchange_date_to:start=dt.datetime(change_date_to.year,change_date_to.month,change_date_to.day)end=ott.next_day(start)ifisinstance(filepath_or_buffer,utils.FileBuffer):symbols='DUMMY'else:# str, because there might passed an os.PathLike objectsymbols=str(filepath_or_buffer)super().__init__(_symbols=symbols,_start=start,_end=end,_base_ep_func=partial(self.base_ep,filepath_or_buffer,columns,forced_title,default_types,has_time,to_drop,timestamp_name,change_date_to,order_ticks,start,end,first_line_is_title,ep_fields,converters,),**columns,)# fake run converters to set proper schemaifconverters:forcolumn,converterinconverters.items():self.schema[column]=converter(self[column]).dtypeifhas_timeandtimestamp_nameinself.schema:ifself.schema[timestamp_name]notin[ott.nsectime,ott.msectime]:raiseValueError(f"CSV converter for {timestamp_name} is converting to {self.schema[timestamp_name]}""type, but expected resulted type is ott.msectime or ott.nsectime")# remove timestamp_name column, if we use it as TIMESTAMP sourceifhas_timeandtimestamp_name!="Time":delself[timestamp_name]defbase_ep(self,filepath_or_buffer,columns,forced_title,default_types,has_time,to_drop,timestamp_name,change_date_to,order_ticks,start,end,first_line_is_title,ep_fields,converters={},):# initialize Source and set schema to columns.file_contents=''ifisinstance(filepath_or_buffer,utils.FileBuffer):file_contents=filepath_or_buffer.get()csv=Source(otq.CsvFileListing(field_delimiters="','",time_assignment="_START_TIME",# we don't use EP's first_line_is_title, because EP raise errror on empty column name,# and we explicitly define name for such columns in FIELDS arg.# but if first line started with # (forced_title=True), then this param ignored :(first_line_is_title=False,fields=ep_fields,file_contents=file_contents),**columns,)iffirst_line_is_titleandnotforced_title:# remove first line with titles for columns.csv.sink(otq.DeclareStateVariables(variables="long __TICK_INDEX=0"))csv.sink(otq.PerTickScript("STATE::__TICK_INDEX = STATE::__TICK_INDEX + 1;"))csv.sink(otq.WhereClause(discard_on_match=False,where="STATE::__TICK_INDEX > 1"))# set tick type to ANYcsv.tick_type("LOCAL::ANY")# check whether need to update types, because if column type is not specified in header# then by default column has string type in OneTickupdate_columns={}forname,dtypeincolumns.items():ifnotissubclass(dtype,str)andnamenotindefault_types:update_columns[name]=dtypeforname,dtypeinupdate_columns.items():ifdtypeisint:csv.sink(otq.UpdateField(field=name,value="atol("+name+")"))elifdtypeisfloat:csv.sink(otq.UpdateField(field=name,value="atof("+name+")"))elifdtypeisott.msectime:csv.sink(otq.UpdateField(field=name,value='"1970/01/01 00:00:00.000"',where=name+'=""'))csv.sink(otq.UpdateField(field=name,value=f'parse_time("%Y/%m/%d %H:%M:%S.%q",{name},_TIMEZONE)'))elifdtypeisott.nsectime:csv.sink(otq.UpdateField(field=name,value='"1970/1/1 00:00:00.000"',where=name+'=""'))csv.sink(otq.UpdateField(field=name,value=f'parse_nsectime("%Y/%m/%d %H:%M:%S.%J",{name},_TIMEZONE)'))else:raiseTypeError(f"Unsupported type '{dtype}'")# run convertersifconverters:forcolumn,converterinconverters.items():ifcsv[column].dtypeisnototp.nsectimeandconverter(csv[column]).dtypeisotp.nsectime:# workaround for resolve bug on column type changing:# https://onemarketdata.atlassian.net/browse/PY-416csv[f'_T_{name}']=converter(csv[column])delcsv[column]csv[column]=csv[f'_T_{name}']delcsv[f'_T_{name}']else:csv[column]=converter(csv[column])ifhas_time:# if timestamp_name column is defined in the csv, then apply tick time adjustmentiftimestamp_nameinconverters:# we assume that if timestamp_name field in converters,# then it is already converted to otp.dtcsv.sink(otq.UpdateField(field="TIMESTAMP",value=timestamp_name,allow_unordered_output_times=True,))else:ifchange_date_to:change_date_to=change_date_to.strftime("%Y/%m/%d")csv.sink(otq.UpdateField(field="Time",value=f'"{change_date_to}" + substr({timestamp_name}, 10)'))# by default we parse timestamp_name into TIMESTAMP field# from typical/default Time format from OneTick dumpcsv.sink(otq.UpdateField(field="TIMESTAMP",value=f'parse_nsectime("%Y/%m/%d %H:%M:%S.%J", {timestamp_name}, _TIMEZONE)',allow_unordered_output_times=True,))# drop source timestamp_name field in favor of new TIMESTAMP fieldto_drop.append(timestamp_name)else:# default time for ticks are increaing from 0csv.sink(otq.DeclareStateVariables(variables="long __TIMESTAMP_INC__ = 0"))csv.sink(otq.UpdateField(field="TIMESTAMP",value="TIMESTAMP + STATE::__TIMESTAMP_INC__"))csv.sink(otq.UpdateField(field="STATE::__TIMESTAMP_INC__",value="STATE::__TIMESTAMP_INC__ +1"))iforder_ticks:csv.sort('TIMESTAMP',inplace=True)ifto_drop:csv.sink(otq.Passthrough(fields=",".join(to_drop),drop_fields="True"))returncsv
classTrades(Source):""" Trade source object. add 'PRICE' and 'SIZE' fields to schema """def__init__(self,db=utils.adaptive_to_default,symbol=utils.adaptive,date=None,start=utils.adaptive,end=utils.adaptive,**kwargs):ifdbisutils.adaptive_to_default:db=configuration.config.default_dbifdate:start,end=date.start,date.endsuper().__init__(_symbols=symbol,_start=start,_end=end,_base_ep_func=lambda:self.base_ep(db),**kwargs)self.schema['PRICE']=floatself.schema['SIZE']=intdefbase_ep(self,db):db=str(db)src=Source(otq.Passthrough(fields="SYMBOL_NAME,TICK_TYPE",drop_fields=True))src.tick_type(db+"::TRD")returnsrcclassQuotes(Source):def__init__(self,db=utils.adaptive_to_default,symbol=utils.adaptive,start=utils.adaptive,end=utils.adaptive,**kwargs):ifdbisutils.adaptive_to_default:db=configuration.config.default_dbsuper().__init__(_symbols=symbol,_start=start,_end=end,_base_ep_func=lambda:self.base_ep(db),**kwargs)self.schema['ASK_PRICE']=floatself.schema['BID_PRICE']=floatself.schema['ASK_SIZE']=intself.schema['BID_SIZE']=intdefbase_ep(self,db):db=str(db)src=Source(otq.Passthrough(fields="SYMBOL_NAME,TICK_TYPE",drop_fields=True))src.tick_type(db+"::QTE")returnsrcclassNBBO(Source):def__init__(self,db="TAQ_NBBO",symbol=utils.adaptive,start=utils.adaptive,end=utils.adaptive,**kwargs):super().__init__(_symbols=symbol,_start=start,_end=end,_base_ep_func=lambda:self.base_ep(db),**kwargs)self.schema['ASK_PRICE']=floatself.schema['BID_PRICE']=floatself.schema['ASK_SIZE']=intself.schema['BID_SIZE']=intdefbase_ep(self,db):db=str(db)src=Source(otq.Passthrough(fields="SYMBOL_NAME,TICK_TYPE",drop_fields=True))src.tick_type(db+"::NBBO")returnsrc
[docs]classQuery(Source):def__init__(self,query_object=None,out_pin=utils.adaptive,symbol=utils.adaptive,start=utils.adaptive,end=utils.adaptive,params=None,**kwargs,):""" Create data source object from otq file or query object Parameters ---------- query_object: path or :class:`query` query to use as a data source out_pin: str query output pin name symbol: str, list of str, :class:`Source`, :class:`query`, :py:func:`eval query <onetick.py.eval>` Symbol(s) from which data should be taken. start, end : :py:class:`datetime.datetime`, :py:class:`otp.datetime <onetick.py.datetime>` or utils.adaptive Time interval from which the data should be taken. params: dict params to pass to query. Only applicable to string ``query_object`` """ifself._try_default_constructor(**kwargs):returnifparamsisNone:params={}# Ignore because of the "Only @runtime_checkable protocols can be used with instance and class checks"ifisinstance(query_object,(str,os.PathLike)):# type: ignorequery_object=query(str(query_object),**params)elifisinstance(query_object,query):iflen(params)>0:raiseValueError("Cannot pass both params and a query() (not str) query_object parameter")else:raiseValueError("query_object parameter has to be either a str (path to the query) or a query object")ifsymbol==utils.adaptive:ifnotquery_object.graph_info.has_unbound_sources:symbol=Nonesuper().__init__(_symbols=symbol,_start=start,_end=end,_base_ep_func=lambda:self.base_ep(query_object,out_pin),**kwargs)defbase_ep(self,query,out_pin):nested=otq.NestedOtq(query.path,query.str_params)graph=query.graph_infoifout_pinisutils.adaptive:iflen(graph.nested_outputs)==1:returnSource(nested[graph.nested_outputs[0].NESTED_OUTPUT])eliflen(graph.nested_outputs)>1:raiseException(f'Query "{query.query_name}" has multiple outputs, but you have not '"specified which one should be used. You could specify it"' using "out_pin" parameter of the Query constructor.')else:# no outputreturnSource(nested,_has_output=False)else:existed_out_pins=set(map(operator.attrgetter("NESTED_OUTPUT"),graph.nested_outputs))ifout_pinnotinexisted_out_pins:raiseException(f'Query "{query.query_name}" does not have the "{out_pin}" output, there are only following 'f"output pins exist: {','.join(existed_out_pins)}")returnSource(nested[out_pin])
[docs]classquery:""" Constructs a query object with a certain path. Keyword arguments specify query parameters. You also can pass an instance of ``otp.query.config`` class as the second positional argument to specify a query. Parameters ---------- path : str path to an .otq file. If path is relative, then it's assumed that file is located in one of the directories specified in OneTick ``OTQ_FILE_PATH`` configuration variable. If there are more than one query in the file, then its name should be specified in the format ``<path>::<query-name>``. Also prefix ``remote://<database-name>::`` can be used to specify if query is located on the remote server. config: optional ``otp.query.config`` object. params: parameters for the query. Raises ------ ValueError, TypeError Examples -------- >>> otp.query('/otqs/some.otq::some_query', PARAM1='val1', PARAM2=3.14) # doctest: +SKIP >>> otp.query('remote://DATABASE::/otqs/some.otq::some_query', PARAM1='val1', PARAM2=3.14) # doctest: +SKIP """
[docs]classconfig:""" The config allows to specify different query options. """special_values={"input"}def__init__(self,output_columns=None):""" Parameters ---------- output_columns : str, list, dict, optional The parameter defines what the outputs columns are. Default value is ``None`` that means no output fields after applying query for every output pin. The ``input`` value means that output columns are the same as inputs for every output pin A list of tuples allows to define output columns with their types; for example [('x', int), ('y', float), ...]. Applicable for every output pin. A dict allows to specify output columns for every output pin. Raises ------ TypeError, ValueError """ifoutput_columnsisnotNone:ifisinstance(output_columns,list):self.validate_columns(output_columns)elifisinstance(output_columns,dict):forpin,columnsinoutput_columns.items():ifnotisinstance(pin,str):raiseTypeError(f"Name of pin '{type(pin)}' is of non-str type '%s'")else:self.validate_columns(columns)elifnotisinstance(output_columns,str):raiseTypeError(f'"output_columns" does not support value of the "{type(output_columns)}" type')ifisinstance(output_columns,str):ifoutput_columnsnotinself.special_values:raiseValueError(f'Config does not support "{output_columns}" value')self.output_columns=output_columns
[docs]defvalidate_list_item(self,item):ifisinstance(item,str):ifitemnotinself.special_values:raiseValueError(f"Value {item} is not supported.")else:ifnotisinstance(item,(tuple,list))or(len(item)!=2)ornotisinstance(item[0],str):raiseTypeError("Value %s is not a name-type tuple.")
[docs]defvalidate_columns(self,columns):ifisinstance(columns,str):ifcolumnsnotinself.special_values:raiseValueError(f"A pin has invalid output columns definition: '{columns}'")elifisinstance(columns,list):ifcolumns.count("input")>1:raiseValueError(f"More than one 'input' value in {columns}")foritemincolumns:self.validate_list_item(item)else:raiseTypeError(f"A pin's columns definition is of unsupported type '{type(columns)}'")
[docs]defget_output_columns_for_pin(self,out_pin_name):ifisinstance(self.output_columns,dict):ifout_pin_namenotinself.output_columns:raiseValueError(f"Pin {out_pin_name} wasn't declared in the config")else:returnself.output_columns[out_pin_name]else:returnself.output_columns
[docs]defapply(self,out_pin_name,src):""" Applying specified logic on a ceration object. Used internally in the functions.apply_query """columns_descriptor=self.get_output_columns_for_pin(out_pin_name)ifcolumns_descriptorisNone:# drop columns by default, because we don't know# how an external query changes data schemasrc.drop_columns()elifcolumns_descriptor=="input":passelse:if"input"notincolumns_descriptor:src.drop_columns()foritemincolumns_descriptor:ifitem!="input":src[item]
def__init__(self,path,*config,**params):path=str(path)ifpath.startswith('remote://'):self.path=pathremote,path=path.split('::',maxsplit=1)else:self.path=f"remote://{configuration.config.get('default_db','LOCAL')}::"+pathself.query_path,self.query_name=utils.query_to_path_and_name(path)# if query_path does not exist, then we try# to resolve it with OTQ_PATH assuming that# a relative path is passedifnotos.path.exists(self.query_path):otq_path=utils.get_config_param(os.environ["ONE_TICK_CONFIG"],"OTQ_FILE_PATH","")self.query_path=utils.abspath_to_query_by_otq_path(otq_path,self.query_path)ifself.query_nameisNone:# it seems that query name was not passed, then try to find itqueries=query_inspector.get_queries(self.query_path)iflen(queries)>1:raiseException(f"{self.query_path} has more than one query, "f"but you have not specified which one to use.")self.query_name=queries[0]# prepare parametersself._str_params=Noneself.params=paramsself.update_params()# prepare configsiflen(config)>1:raiseValueError(f"It is allowed to specify only one config object, but passed {len(config)}")eliflen(config)==1:ifnotisinstance(config[0],self.config):raiseTypeError(f'It is expected to see config of the "query.config" type, but got "{type(config[0])}"')self.config=config[0]else:self.config=self.config()self.graph_info=query_inspector.get_query_info(self.query_path,self.query_name)def__call__(self,*ticks,**pins):forkey,valueinpins.items():ifnotisinstance(value,Source):raiseValueError(f'Input "{key}" pin does not support "{type(value)}" type')iflen(pins)==0andlen(ticks)==1:iflen(self.graph_info.nested_inputs)!=1:raiseException(f'It is expected the query "{self.query_path}" to have one input, but it'f" has {len(self.graph_info.nested_inputs)}")pins[self.graph_info.nested_inputs[0].NESTED_INPUT]=ticks[0]eliflen(pins)>0andlen(ticks)==0:passeliflen(pins)==0andlen(ticks)==0:# it is the valid case, when query has no input pinspasselse:raiseValueError("It is allowed to pass only one non-specified input")outputs=self._outputs()outputs.query=selfoutputs.in_sources=pinsreturnoutputsclass_outputs(object):def__getitem__(self,key):output_pins=[]iftype(key)istuple:output_pins=list(key)elifisinstance(key,str):output_pins=[key]elifkeyisNone:# No outputpasselse:raiseValueError(f'Output pins can not be of "{type(key)}" type')returnonetick.py.functions.apply_query(self.query,in_sources=self.in_sources,output_pins=output_pins,**self.query.params)
[docs]defto_eval_string(self):"""Converts query object to `eval` string"""res='"'+self.path+'"'ifself.params:res+=f', "{self._params_to_str(self.params,with_expr=True)}"'return"eval("+res+")"
@propertydefstr_params(self):"""Query parameters converted to string"""ifself._str_paramsisNone:self._str_params=self._params_to_str(self.params)returnself._str_params@staticmethoddef_params_to_str(params,*,with_expr=False):""" converts param to str Parameters ---------- params: dict Parameters as dict(name=value) with_expr: If true return all expression in expr() function Returns ------- result: str string representation of parameters ready for query evaluation """defto_str(v):ifisinstance(v,list):return"\\,".join(map(to_str,v))else:ifwith_expr:is_dt=ott.is_time_type(v)ifis_dt:v=ott.value2str(v)result=query._escape_quotes_in_eval(v)ifisinstance(v,_Operation)andgetattr(v,"name",None)!="_SYMBOL_NAME"oris_dt:result=f"expr({result})"else:result=query._escape_characters_in_query_param(str(v))returnresultreturn",".join(key+"="+to_str(value)forkey,valueinparams.items())@staticmethoddef_escape_quotes_in_eval(v):returnstr(v).translate(str.maketrans({"'":r"\'",'"':r'\"'}))@staticmethoddef_escape_characters_in_query_param(result):# 0 - no need to add backslash, 1 - need to addchar_map=[0]*len(result)# put 1 between two quotes symbolsopen_char=Nonelast_inx=0forinx,cinenumerate(result):ifopen_char==c:open_char=Nonecontinueifnotopen_charandc=="'"orc=='"':open_char=clast_inx=inx+1continueifopen_char:char_map[inx]=1# clean open tail if necessaryifopen_char:char_map[last_inx:]=[0]*(len(result)-last_inx)# apply mappingres=[]last_esc=False# do not add esc if the previous one is already escn_brackets_in_expr_block=0# do not escape in expr(...)forinx,cinenumerate(result):ifc=="(":ifn_brackets_in_expr_block:n_brackets_in_expr_block+=1elifresult[inx-4:inx]=="expr":n_brackets_in_expr_block=1ifc==")"andn_brackets_in_expr_block:n_brackets_in_expr_block-=1ifcin_QUERY_PARAM_SPECIAL_CHARACTERSandchar_map[inx]==0:ifnotlast_escandnotn_brackets_in_expr_block:c="\\"+clast_esc=c=="\\"res.append(c)return"".join(res)
classOrders(Source):def__init__(self,db="S_ORDERS_FIX",symbol=utils.adaptive,start=utils.adaptive,end=utils.adaptive,**kwargs):super().__init__(_symbols=symbol,_start=start,_end=end,_base_ep_func=lambda:self.base_ep(db),**kwargs)self.schema['ID']=strself.schema['BUY_FLAG']=intself.schema['SIDE']=strself.schema['STATE']=strself.schema['ORDTYPE']=strself.schema['PRICE']=floatself.schema['PRICE_FILLED']=floatself.schema['QTY']=intself.schema['QTY_FILLED']=intdefbase_ep(self,db):db=str(db)src=Source(otq.Passthrough(fields="SYMBOL_NAME,TICK_TYPE",drop_fields=True))src.tick_type(db+"::ORDER")returnsrc_db_doc=_param_doc(name='db',desc=""" Name(s) of the database or the database object(s). """,str_annotation='str, list of str, :class:`otp.DB <onetick.py.DB>`',default=None,str_default='None',)_symbol_doc=_param_doc(name='symbol',desc=""" Symbol(s) from which data should be taken. """,str_annotation='str, list of str, :class:`Source`, :class:`query`, :py:func:`eval query <onetick.py.eval>`',default=utils.adaptive,str_default=' :py:class:`onetick.py.adaptive`',)_tick_type_doc=_param_doc(name='tick_type',desc=""" Tick type of the data. If not specified, all ticks from `db` will be taken. If ticks can't be found or there are many databases specified in `db` then default is "TRD". """,str_annotation='str, list of str',default=utils.adaptive,str_default=' :py:class:`onetick.py.adaptive`',)_start_doc=_param_doc(name='start',desc=""" Start of the interval from which the data should be taken. Default is :py:class:`onetick.py.adaptive`, making the final query deduce the time limits from the rest of the graph. """,str_annotation=(':py:class:`datetime.datetime`, :py:class:`otp.datetime <onetick.py.datetime>`,'' :py:class:`onetick.py.adaptive`'),default=utils.adaptive,str_default=' :py:class:`onetick.py.adaptive`',)_end_doc=_param_doc(name='end',desc=""" End of the interval from which the data should be taken. Default is :py:class:`onetick.py.adaptive`, making the final query deduce the time limits from the rest of the graph. """,str_annotation=(':py:class:`datetime.datetime`, :py:class:`otp.datetime <onetick.py.datetime>`,'' :py:class:`onetick.py.adaptive`'),default=utils.adaptive,str_default=' :py:class:`onetick.py.adaptive`',)_date_doc=_param_doc(name='date',desc=""" Allows to specify a whole day instead of passing explicitly ``start`` and ``end`` parameters. If it is set along with the ``start`` and ``end`` parameters then last two are ignored. """,str_annotation=":class:`datetime.datetime`, :class:`otp.datetime <onetick.py.datetime>`, optional",default=None,)_schema_policy_doc=_param_doc(name='schema_policy',desc=""" Schema deduction policy: - 'manual' The resulting schema is a combination of ``desired_schema`` and database schema. Compatibility with database schema will not be checked. - 'manual_strict' The resulting schema will be exactly ``desired_schema``. Compatibility with database schema will not be checked. - 'tolerant' The resulting schema is a combination of ``desired_schema`` and database schema. If the database schema can be deduced, it's checked to be type-compatible with a ``desired_schema``, and ValueError is raised if checks are failed. Also, with this policy database is scanned 5 days back to find the schema. It is useful when database is misconfigured or in case of holidays. - 'tolerant_strict' The resulting schema will be ``desired_schema`` if it's not empty. Otherwise, database schema is used. If the database schema can be deduced, it's checked if it lacks fields from the ``desired_schema`` and it's checked to be type-compatible with a ``desired_schema`` and ValueError is raised if checks are failed. Also, with this policy database is scanned 5 days back to find the schema. It is useful when database is misconfigured or in case of holidays. - 'fail' The same as 'tolerant', but if the database schema can't be deduced, raises an Exception. - 'fail_strict' The same as 'tolerant_strict', but if the database schema can't be deduced, raises an Exception. """,str_annotation="'tolerant', 'tolerant_strict', 'fail', 'fail_strict', 'manual', 'manual_strict'",default=None,)_guess_schema_doc=_param_doc(name='guess_schema',desc=""" .. deprecated:: 1.3.16 Use ``schema_policy`` parameter instead. """,annotation=bool,default=None,)_identify_input_ts_doc=_param_doc(name='identify_input_ts',desc=""" If set to False, the fields SYMBOL_NAME and TICK_TYPE are not appended to the output ticks. """,annotation=bool,default=False,)_back_to_first_tick_doc=_param_doc(name='back_to_first_tick',desc=""" Determines how far back to go looking for the latest tick before ``start`` time. If one is found, it is inserted into the output time series with the timestamp set to ``start`` time. Note: it will be rounded to int, so otp.Millis(999) will be 0 seconds. """,str_annotation=('int, :ref:`offset <datetime_offsets>`, '':class:`otp.expr <onetick.py.expr>`, '':py:class:`~onetick.py.Operation`'),default=0,)_keep_first_tick_timestamp_doc=_param_doc(name='keep_first_tick_timestamp',desc=""" If set, new field with this name will be added to source. This field contains original timestamp of the tick that was taken from before the start time of the query. For all other ticks value in this field will be equal to the value of Time field. This parameter is ignored if ``back_to_first_tick`` is not set. """,annotation=str,default=None,)_presort_doc=_param_doc(name='presort',desc=""" Add the presort EP in case of bound symbols. Applicable only when ``symbols`` is not None. By default, it is set to True if ``symbols`` are set and to False otherwise. """,annotation=bool,default=utils.adaptive,str_default=' :py:class:`onetick.py.adaptive`',)_concurrency_doc=_param_doc(name='concurrency',desc=""" Specifies number of CPU cores to utilize for the ``presort`` By default, the value from otp.config.default_concurrency is used. """,annotation=int,default=None,)_batch_size_doc=_param_doc(name='batch_size',desc=""" Specifies the query batch size for the ``presort``. By default, the value from otp.config.default_batch_size is used. """,annotation=int,default=None,)_desired_schema_doc=_param_doc(name='desired_schema',desc=""" List of <column name> -> <column type> pairs that the source is expected to have. If the type is irrelevant, provide None as the type in question. """,str_annotation='type[str]',kind=inspect.Parameter.VAR_KEYWORD,)DATA_SOURCE_DOC_PARAMS=[_db_doc,_symbol_doc,_tick_type_doc,_start_doc,_end_doc,_date_doc,_schema_policy_doc,_guess_schema_doc,_identify_input_ts_doc,_back_to_first_tick_doc,_keep_first_tick_timestamp_doc,_presort_doc,_concurrency_doc,_batch_size_doc,_desired_schema_doc,]
[docs]classDataSource(Source):POLICY_MANUAL="manual"POLICY_MANUAL_STRICT="manual_strict"POLICY_TOLERANT="tolerant"POLICY_TOLERANT_STRICT="tolerant_strict"POLICY_FAIL="fail"POLICY_FAIL_STRICT="fail_strict"_VALID_POLICIES=frozenset([POLICY_MANUAL,POLICY_MANUAL_STRICT,POLICY_TOLERANT,POLICY_TOLERANT_STRICT,POLICY_FAIL,POLICY_FAIL_STRICT])_PROPERTIES=Source._PROPERTIES+["_p_db","_p_strict","_p_schema"]def__get_schema(self,db,start,schema_policy):schema={}ifstartisutils.adaptive:start=None# means that use the last date with dataifisinstance(db,list):''' This case of a merge, since we need to get combined schema across different tick types and dbs '''fort_dbindb:_db=t_db.split(':')[0]_tt=t_db.split(':')[-1]db_obj=onetick.py.db._inspection.DB(_db)ifschema_policy==self.POLICY_TOLERANTandstart:# repeating the same logic as in db_obj.last_datestart=db_obj.last_not_empty_date(start,days_back=5,tick_type=_tt)schema.update(db_obj.schema(date=start,tick_type=_tt))ifdbisNoneorisinstance(db,_SymbolParamColumn):''' In this case we can't get schema, because db is calculated dynamicatlly. Set to empty to indicate that in this case we expect the manualy set schema. '''schema={}returnschemadef__prepare_schema(self,db,start,end,schema_policy,guess_schema,desired_schema):ifguess_schemaisnotNone:warnings.warn("guess_schema flag is deprecated; use schema_policy argument instead",DeprecationWarning,)ifschema_policyisnotNone:raiseValueError("guess_schema and schema_policy cannot be set at the same time")ifguess_schema:schema_policy=self.POLICY_FAILelse:schema_policy=self.POLICY_MANUALifschema_policyisNone:schema_policy=self.POLICY_TOLERANTifschema_policynotinself._VALID_POLICIES:raiseValueError(f"Invalid schema_policy; allowed values are: {self._VALID_POLICIES}")actual_schema={}ifschema_policynotin(self.POLICY_MANUAL,self.POLICY_MANUAL_STRICT):actual_schema=self.__get_schema(db,start,schema_policy)dbs=', '.join(dbifisinstance(db,list)else[])iflen(actual_schema)==0:ifschema_policyin(self.POLICY_FAIL,self.POLICY_FAIL_STRICT):raiseException(f'No ticks found in database(-s) {dbs}')# lets try to use at least somethingreturndesired_schema.copy()fork,vindesired_schema.items():field_type=actual_schema.get(k,None)iffield_typeisNone:ifself._p_strictorschema_policyin(self.POLICY_TOLERANT,self.POLICY_FAIL):raiseValueError(f"Database(-s) {dbs} schema has no {k} field")elifnotissubclass(field_type,v):raiseValueError(f"Database(-s) {dbs} schema field {k} has type {field_type}, but {v} was requested")ifnotself._p_strict:desired_schema.update(actual_schema)table_schema=desired_schema.copy()ifnotself._p_strict:# in this case we will table only fields specified by usertable_schema={k:vfork,vintable_schema.items()ifknotinactual_schema}returntable_schemadef__prepare_dates(self,date):ifisinstance(date,ott.datetime)orisinstance(date,ott.date):start=date.startend=date.endifisinstance(date,dt.datetime)orisinstance(date,dt.date):start=dt.datetime(date.year,date.month,date.day)end=start+dt.timedelta(days=1,milliseconds=-1)returnstart,enddef__prepare_db_tick_type(self,db,tick_type,symbol,start,end):ifisinstance(db,list):''' If everything is correct then this branch should leave the `db` var as a list of databases with tick types and the `tick_type` var is None. Valid cases: - Fully defined case. The `db` parameter has a list of databases where every database has a tick type, when the `tick_type` parameter has default value or None (for backward compatibility) - Partially defined case. The `db` parameter has a list of databases but not every database has a tick type, and meantime the `tick_type` is passed to not None value. In that case databases without tick type are exetended with a tick type from the `tick_type` parameter - No defined case. The `db` parameter has a list of databases and every database there has no tick type, and the `tick_type` is set to not None value. In that case every database is extended with the tick type from the `tick_type`. '''defdb_converter(_db):ifisinstance(_db,DB):return_db.nameelse:return_dbdb=[db_converter(_db)for_dbindb]res=all(('::'in_dband_db[-1]!=':'for_dbindb))ifres:iftick_typeisutils.adaptiveortick_typeisNone:tick_type=None# tick types is specified for all databaseselse:raiseException('The `tick_type` is set as a parameter ''and also as a part of the `db` parameter''for every database')else:dbs_without_tt=[_db.split(':')[0]for_dbindbif'::'notin_dbor_db[-1]==':']iftick_typeisutils.adaptive:tick_type='TRD'# default one for backward compatibility and testing usecaseiftick_typeisNone:raiseException('The tick type is not set for databases: '+', '.join(dbs_without_tt))else:# extend databases with missing tick types from the tick tick parameterdbs_with_tt=[_dbfor_dbindbif'::'in_dband_db[-1]!=':']db=dbs_with_tt+[_db+'::'+tick_typefor_dbindbs_without_tt]tick_type=Noneifisinstance(db,(DB,inspect_DB)):db=db.name# ... and we go to the next branchifisinstance(db,str):''' The resulting `db` var contains a list with string value, that has the `db` concatenated with the `tick_type`. '''if'::'indb:iftick_typeisutils.adaptiveortick_typeisNone:tick_type=db.split(':')[-1]db=db.split('::')[0]else:raiseException('The `tick_type` is set as a parameter ''and also as a part of the `db` parameter')else:iftick_typeisutils.adaptiveortick_typeisNone:db_obj=onetick.py.db._inspection.DB(db)# try to find at least one common tick type# through all daystick_types=Noneifstartisutils.adaptive:start=db_obj.last_dateend=db_obj.last_dateifstartandend:# could be None if there is no datat_start=startwhilet_start<=end:t_tts=set(db_obj.tick_types(t_start))t_start+=dt.timedelta(days=1)iflen(t_tts)==0:continueiftick_typesisNone:tick_types=t_ttselse:tick_types&=t_ttsiflen(tick_types)==0:raiseException(f'It seems that there is no common 'f'tick types for dates from {start} 'f'to {end}. Please specify a tick ''type')iftick_typesisNone:iftick_typeisutils.adaptive:tick_types=['TRD']# the default oneelse:raiseException(f'Could not find any data in from {start} 'f' to {end}. Could you check that tick type, '' database and date range are correct.')iflen(tick_types)!=1:raiseException('The tick type is not specified, found ''multiple tick types in the database : '+', '.join(tick_types))tick_type=tick_types.pop()ifnotisinstance(tick_type,str)andisinstance(tick_type,Iterable):db=[f'{db}::{tt}'forttintick_type]else:db=[db+'::'+tick_type]tick_type=Noneifisinstance(db,_SymbolParamColumn):''' Do nothing, because we don't know whether db will come with the tick type or not. The only one thing that definetely we know that tick_type can not be utils.adpative '''iftick_typeisutils.adaptive:# TODO: need to test this caseraiseException('The `db` is set to the symbol param, in that case ''the `tick_type` should be set explicitly to some value ''or to None')ifdbisNone:''' This case means that database comes with the symbol name, then tick type should be defined '''iftick_typeisutils.adaptiveortick_typeisNone:raiseException('The `db` is not specified that means database is ''expected to be defined with the symbol name. ''In that case the `tick_type` should be defined.')ifnotisinstance(tick_type,str)andisinstance(tick_type,Iterable):tick_type='+'.join(tick_type)returndb,tick_type@docstring(parameters=DATA_SOURCE_DOC_PARAMS,add_self=True)def__init__(self,db=None,symbol=utils.adaptive,tick_type=utils.adaptive,start=utils.adaptive,end=utils.adaptive,date=None,schema_policy=None,guess_schema=None,identify_input_ts=False,back_to_first_tick=0,keep_first_tick_timestamp=None,*,symbols=None,presort=utils.adaptive,batch_size=None,concurrency=None,**desired_schema,):""" Construct a source providing data from a given ``db``. Examples --------- Symbol can be a collection >>> # OTdirective: snippet-name:fetch data.simple; >>> data = otp.DataSource(db='SOME_DB', tick_type='TT', symbols=['S1', 'S2']) >>> otp.run(data) Time X 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.000 -3 2 2003-12-01 00:00:00.001 2 3 2003-12-01 00:00:00.001 -2 4 2003-12-01 00:00:00.002 3 5 2003-12-01 00:00:00.002 -1 Source also can be passed as symbols, in such case magic named column SYMBOL_NAME will be transform to symbol and all other columns will be symbol parameters >>> # OTdirective: snippet-name:fetch data.symbols as a source; >>> symbols = otp.Ticks(SYMBOL_NAME=['S1', 'S2']) >>> data = otp.DataSource(db='SOME_DB', symbols=symbols, tick_type='TT') >>> otp.run(data) Time X 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.000 -3 2 2003-12-01 00:00:00.001 2 3 2003-12-01 00:00:00.001 -2 4 2003-12-01 00:00:00.002 3 5 2003-12-01 00:00:00.002 -1 Default schema policy is `tolerant`. >>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', PRICE=float, date=otp.dt(2022, 3, 1)) >>> data.schema {'PRICE': <class 'float'>, 'SIZE': <class 'int'>} >>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', PRICE=int, date=otp.dt(2022, 3, 1)) Traceback (most recent call last): ... ValueError: Database(-s) NYSE_TAQ::TRD schema field PRICE has type <class 'float'>, but <class 'int'> was requested Schema policy `manual` uses exactly desired_schema: >>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', PRICE=float, \ date=otp.dt(2022, 3, 1), schema_policy='manual') >>> data.schema {'PRICE': <class 'float'>} Schema policy 'fail' raise an exception if the schema cannot be deduced >>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', date=otp.dt(2021, 3, 1), \ schema_policy='fail') Traceback (most recent call last): ... Exception: No ticks found in database(-s) NYSE_TAQ::TRD """ifself._try_default_constructor(**desired_schema):return# for cases when we want to explicitly convert into string,# it might be symbol param or join_with_query parameterifisinstance(tick_type,_ParamColumn):tick_type=str(tick_type)[1:-1]ifdate:# TODO: write a warning in that casestart,end=self.__prepare_dates(date)db,tick_type=self.__prepare_db_tick_type(db,tick_type,symbol,start,end)self._p_db=dbself._p_strict=schema_policyin(self.POLICY_FAIL_STRICT,self.POLICY_TOLERANT_STRICT,self.POLICY_MANUAL_STRICT)self._p_schema=self.__prepare_schema(db,# tick type is embedded into the dbstart,end,schema_policy,guess_schema,desired_schema)ifsymbolsisnotNone:ifsymbolisutils.adaptiveorsymbolisNone:symbol=symbolselse:# TODO: test itraiseException('You have set the `symbol` and `symbols` parameters''together, it is not allowed. Please, clarify parameters')ifisinstance(symbol,Symbols)andsymbol._p_dbisNone:symbol=Symbols.duplicate(symbol,db=db)ifidentify_input_ts:if"SYMBOL_NAME"indesired_schema:raiseException()# TODO: think about how user could workaround itdesired_schema["SYMBOL_NAME"]=strif"TICK_TYPE"indesired_schema:raiseException()desired_schema["TICK_TYPE"]=str# unobvious way to convert otp.Minute/Hour/... to number of secondsiftype(back_to_first_tick).__name__=='_DatePartCls':back_to_first_tick=int((ott.dt(0)+back_to_first_tick).timestamp())ifisinstance(back_to_first_tick,_Operation):back_to_first_tick=otp.expr(back_to_first_tick)ifback_to_first_tick!=0andkeep_first_tick_timestamp:desired_schema[keep_first_tick_timestamp]=ott.nsectimeif(isinstance(symbol,Source)orhasattr(symbol,"__iter__")andnotisinstance(symbol,dict)andnotisinstance(symbol,str)orisinstance(symbol,query)orisinstance(symbol,_QueryEvalWrapper)):super().__init__(_start=start,_end=end,_base_ep_func=lambda:self._base_ep_for_cross_symbol(db,symbol,tick_type,identify_input_ts=identify_input_ts,back_to_first_tick=back_to_first_tick,keep_first_tick_timestamp=keep_first_tick_timestamp,presort=presort,batch_size=batch_size,concurrency=concurrency,),**desired_schema)else:super().__init__(_symbols=symbol,_start=start,_end=end,_base_ep_func=lambda:self.base_ep(db,tick_type,identify_input_ts=identify_input_ts,back_to_first_tick=back_to_first_tick,keep_first_tick_timestamp=keep_first_tick_timestamp,),**desired_schema)@propertydefdb(self):returnself._p_db@staticmethoddef_create_source(passthrough_ep,back_to_first_tick=0,keep_first_tick_timestamp=None):"""Create graph that save original timestamp of first tick if needed"""ifback_to_first_tick!=0andkeep_first_tick_timestamp:src=Source(otq.Passthrough())src.sink(otq.AddField(field=keep_first_tick_timestamp,value='TIMESTAMP'))src.sink(passthrough_ep)returnsrcreturnSource(passthrough_ep)def_table_schema(self,src):returnsrc.table(**self._p_schema,strict=self._p_strict)defbase_ep(self,db,tick_type,identify_input_ts,back_to_first_tick=0,keep_first_tick_timestamp=None):ifdbisnotNone:ifisinstance(db,list):str_db="+".join(db)else:str_db=str(db)iftick_type:ifisinstance(db,_SymbolParamColumn):str_db=f"expr({str_db} + '::{tick_type}')"# TODO: testelse:if"::"notinstr_db:str_db+="::"+tick_typeelse:ifisinstance(db,_SymbolParamColumn):str_db=f"expr({str_db})"# TODO: testelse:str_db=tick_typeifisinstance(db,list)orisinstance(db,_SymbolParamColumn):src=self._create_source(otq.Passthrough(go_back_to_first_tick=back_to_first_tick),back_to_first_tick=back_to_first_tick,keep_first_tick_timestamp=keep_first_tick_timestamp)src.sink(otq.Merge(identify_input_ts=identify_input_ts))else:params=dict(go_back_to_first_tick=back_to_first_tick)ifidentify_input_ts:params["fields"]="SYMBOL_NAME,TICK_TYPE"params["drop_fields"]=Truesrc=self._create_source(otq.Passthrough(**params),back_to_first_tick=back_to_first_tick,keep_first_tick_timestamp=keep_first_tick_timestamp)src.tick_type(str_db)src=self._table_schema(src)returnsrcdef_base_ep_for_cross_symbol(self,db,symbol,tick_type,identify_input_ts,back_to_first_tick=0,keep_first_tick_timestamp=None,presort=utils.adaptive,batch_size=None,concurrency=None,):tmp_otq=TmpOtq()ifisinstance(symbol,_QueryEvalWrapper):symbol=symbol.to_eval_string(tmp_otq=tmp_otq)elifisinstance(symbol,query):symbol=symbol.to_eval_string()elifisinstance(symbol,Source):symbol=self._convert_symbol_to_string(symbol,tmp_otq)ifdbisnotNone:ifisinstance(db,list):tick_type="+".join(db)else:tick_type=f"{db}::{tick_type}"src=self._create_source(otq.Passthrough(go_back_to_first_tick=back_to_first_tick),back_to_first_tick=back_to_first_tick,keep_first_tick_timestamp=keep_first_tick_timestamp)ifpresortisutils.adaptive:presort=Trueifpresort:ifbatch_sizeisNone:batch_size=otp.config.default_batch_sizeifconcurrencyisNone:concurrency=(otp.config.default_concurrencyifotp.config.default_concurrencyisnotNone# otq.Presort does not support Noneelse'')src.sink(otq.Presort(batch_size=batch_size,max_concurrency=concurrency).symbols(symbol).tick_type(tick_type))src.sink(otq.Merge(identify_input_ts=identify_input_ts))else:src.sink(otq.Merge(identify_input_ts=identify_input_ts).symbols(symbol).tick_type(tick_type))src._tmp_otq.merge(tmp_otq)src=self._table_schema(src)returnsrc
Custom=DataSource# for backward compatiblity, previously we had only Custom
[docs]classSymbols(Source):""" Construct a source that returns ticks with information about symbols in a database. The SYMBOL_NAME field is populated with symbol names. The TICK_TYPE field contains corresponding tick type (enabled by the ``show_tick_type`` parameter). Parameters ---------- db: str Name of the database where to search symbols tick_type: str Tick type to use. Default is `ANY` start, end: :py:class:`datetime.datetime`, :py:class:`otp.datetime <onetick.py.datetime>`, \ :py:class:`onetick.py.adaptive` Time interval from which the data should be taken. date: :py:class:`datetime.date` Alernative way of setting instead of start/end times keep_db: bool Flag that indicates whether symbols should have a db prefix. pattern: str SQL syntax patter for symbols. Default is '%' for_tick_type: str Fetch only symbols belong to this tick type, if specified. show_tick_type: bool Add the TICK_TYPE column with the information about tick type symbology: str The destination symbology for a symbol name translation. Translation is performed, if destination symbology is not empty and is different from that of the queried database. show_original_symbols: bool Switches original symbol name propagation as a tick field ORIGINAL_SYMBOL_NAME if symbol name translation is performed (if `symbology` is set). Note that if this parameter is set to True, database symbols with missing translations are also propagated. Note ---- Additional fields that can be added to Symbols will be converted to symbol parameters See also -------- | :ref:`Symbols guide <Symbols>` | **FIND_DB_SYMBOLS** OneTick event processor Examples -------- This class can be used to get a list of all symbols in the database: >>> symbols = otp.Symbols('NYSE_TAQ', date=otp.dt(2022, 3, 1)) >>> otp.run(symbols) Time SYMBOL_NAME 0 2022-03-01 AAP 1 2022-03-01 AAPL Also this class can be used to specify symbols for the main query: >>> symbols = otp.Symbols('NYSE_TAQ', date=otp.dt(2022, 3, 1)) >>> data = otp.DataSource('NYSE_TAQ', tick_type='TRD', date=otp.dt(2022, 3, 1)) >>> result = otp.run(data, symbols=symbols) >>> result['AAPL'] Time PRICE SIZE 0 2022-03-01 00:00:00.000 1.3 100 1 2022-03-01 00:00:00.001 1.4 10 2 2022-03-01 00:00:00.002 1.4 50 >>> result['AAP'] Time PRICE 0 2022-03-01 00:00:00.000 45.37 1 2022-03-01 00:00:00.001 45.41 Additional fields of the ``otp.Symbols`` can be used in the main query as symbol parameters: >>> symbols = otp.Symbols('SOME_DB', show_tick_type=True, keep_db=True) >>> symbols['PARAM'] = symbols['SYMBOL_NAME'] + '__' + symbols['TICK_TYPE'] >>> data = otp.DataSource('SOME_DB') >>> data['S_PARAM'] = data.Symbol.PARAM >>> data = otp.merge([data], symbols=symbols) >>> otp.run(data) Time X S_PARAM 0 2003-12-01 00:00:00.000 1 SOME_DB::S1__TT 1 2003-12-01 00:00:00.000 -3 SOME_DB::S2__TT 2 2003-12-01 00:00:00.001 2 SOME_DB::S1__TT 3 2003-12-01 00:00:00.001 -2 SOME_DB::S2__TT 4 2003-12-01 00:00:00.002 3 SOME_DB::S1__TT 5 2003-12-01 00:00:00.002 -1 SOME_DB::S2__TT """_PROPERTIES=Source._PROPERTIES+["_p_db","_p_pattern","_p_start","_p_end","_p_for_tick_type","_p_keep_db"]def__init__(self,db=None,tick_type="ANY",start=utils.adaptive,end=utils.adaptive,date=None,find_params=None,keep_db=False,pattern='%',for_tick_type=None,show_tick_type=False,symbology='',show_original_symbols=False,**kwargs):ifself._try_default_constructor(**kwargs):returnself._p_db=dbself._p_pattern=patternself._p_start=startself._p_end=endself._p_keep_db=keep_dbself._p_for_tick_type=for_tick_typeifdate:ifisinstance(date,ott.datetime)orisinstance(date,ott.date):start=date.startend=date.end_symbol=utils.adaptiveifdb:ifisinstance(db,list):_symbol=[f"{str(_db).split(':')[0]}::"for_dbindb]# noqaelse:_symbol=f"{str(db).split(':')[0]}::"# noqa_find_params=find_paramsiffind_paramsisnotNoneelse{}_find_params.setdefault('pattern',pattern)iffor_tick_type:_find_params['tick_type_field']=for_tick_type_find_params.setdefault('show_tick_type',show_tick_type)_find_params.setdefault('symbology',symbology)_find_params.setdefault('show_original_symbols',show_original_symbols)super().__init__(_symbols=_symbol,_start=start,_end=end,_base_ep_func=lambda:self.base_ep(ep_tick_type=tick_type,keep_db=keep_db,**_find_params),)self.schema['SYMBOL_NAME']=strif_find_params['show_tick_type']:self.schema['TICK_TYPE']=strif_find_params['symbology']and_find_params['show_original_symbols']:self.schema['ORIGINAL_SYMBOL_NAME']=strdefbase_ep(self,ep_tick_type,keep_db,**params):src=Source(otq.FindDbSymbols(**params))src.tick_type(ep_tick_type)src.schema['SYMBOL_NAME']=strifnotkeep_db:src["SYMBOL_NAME"]=src["SYMBOL_NAME"].str.regex_replace('.*::','')returnsrc@staticmethoddefduplicate(obj,db=None):returnSymbols(db=obj._p_dbifdbisNoneelsedb,pattern=obj._p_pattern,start=obj._p_start,end=obj._p_end,keep_db=obj._p_keep_db,for_tick_type=obj._p_for_tick_type)
defdefault_date_converter(date):returnpd.to_datetime(date,format='%Y%m%d%H%M%S.%f')defto_timestamp_nanos(date,date_converter,tz):date=date_converter(date)ifisinstance(date,ott.dt):date=date.tselse:date=pd.to_datetime(date)returndate.tz_localize(tz)defLocalCSVTicks(path,start=utils.adaptive,end=utils.adaptive,date_converter=default_date_converter,additional_date_columns=None,converters=None,tz=None,):""" Loads ticks from csv file, and creating otp.Ticks object from them Parameters ---------- path: str Absolute path to csv file start: datetime object Start of the query interval end: datetime object End of the query interval date_converter: A converter from string to datetime format, by default used only to TIMESTAMP column additional_date_columns: Other columns to convert to datetime format converters: Non default converters to columns from strings tz: timezone Returns ------- otp.Ticks """iftzisNone:tz=configuration.config.tzc={'TIMESTAMP':partial(to_timestamp_nanos,date_converter=date_converter,tz=tz)}ifconvertersisnotNone:c.update(converters)ifadditional_date_columnsisnotNone:c.update({column:partial(to_timestamp_nanos,date_converter=date_converter,tz=tz,)forcolumninadditional_date_columns})df=pd.read_csv(path,converters=c)df['TS_']=df['TIMESTAMP']df['SYMBOL_NAME']=df['#SYMBOL_NAME']d=df.to_dict(orient='list')deld['TIMESTAMP']deld['#SYMBOL_NAME']ticks=Ticks(d,start=start,end=end)ticks['TIMESTAMP']=ticks['TS_']ticks=ticks.drop('TS_')returnticksclassSymbologyMapping(Source):_PROPERTIES=Source._PROPERTIES+["_p_dest_symbology"]def__init__(self,dest_symbology:str=None,tick_type:str=None,start=utils.adaptive,end=utils.adaptive,symbols=utils.adaptive,**desired_schema):ifself._try_default_constructor(**desired_schema):returnifnotdest_symbologyornottick_type:raiseTypeError("Missing required argument: 'dest_symbology' or 'tick_type'")self._p_dest_symbology=dest_symbologysuper().__init__(_symbols=symbols,_start=start,_end=end,_base_ep_func=lambda:self.base_ep(dest_symbology,tick_type),**desired_schema)self.schema['MAPPED_SYMBOL_NAME']=strself.schema['END_DATETIME']=ott.nsectime@propertydefdest_symbology(self):returnself._p_dest_symbologydefbase_ep(self,dest_symbology,tick_type):src=Source(otq.SymbologyMapping(dest_symbology=dest_symbology))src.tick_type(tick_type)returnsrcclassSplitQueryOutputBySymbol(Source):def__init__(self,query=None,symbol_field=None,single_invocation=False,db=utils.adaptive_to_default,tick_type=utils.adaptive,start=utils.adaptive,end=utils.adaptive,symbols=utils.adaptive,**desired_schema):ifself._try_default_constructor(**desired_schema):returnifisinstance(query,Source):# TODO: support already existing queriesquery=query.copy()otq_query=query._save_as_tmp_otq()q_start,q_end,_=query._get_date_range()ifstartisutils.adaptiveandendisutils.adaptive:start,end=q_start,q_endelse:raiseException('Non supported type of the `query` is specified')ifdbisutils.adaptive_to_default:db=configuration.config.get('default_db')iftick_typeisutils.adaptive:tick_type='SPLIT_BY_SYMBOL'super().__init__(_symbols=symbols,_start=start,_end=end,_base_ep_func=partial(self.build,db,tick_type,symbol_field,otq_query,single_invocation),**desired_schema)defbuild(self,db,tick_type,symbol_field_name,otq_query,single_invocation):src=Source(otq.SplitQueryOutputBySymbol(otq_query=otq_query,symbol_field_name=str(symbol_field_name),ensure_single_invocation=single_invocation))ifdb:tick_type=str(db)+f'::{tick_type}'src.tick_type(tick_type)returnsrc
[docs]defby_symbol(src:Source,symbol_field,single_invocation=None,db=utils.adaptive_to_default,tick_type=utils.adaptive,start=utils.adaptive,end=utils.adaptive,)->Source:""" Create a separate data series for each unique value of ``symbol_field`` in the output of ``src``. ``src`` must specify enough parameters to be run (e.g., symbols, query range). A typical use case is to split a single data series (e.g., from a CSV file) into separate data series by symbol. This method is a source. Parameters ---------- src: Source a query which output is to be split by ``symbol_field`` symbol_field: str the name of the field carrying symbol name in the ``src`` query single_invocation: bool, optional ``True`` means that the ``src`` query is run once and the result stored in memory speeding up the execution. ``False`` means that the ``src`` query is run for every symbol of the query saving memory but slowing down query execution. ``None`` means that this flag will be automatically chosen as ``True`` for CSV sources and ``False`` for other sources. db: str, optional Database for running the query. Doesn't affect the ``src`` query. The default value is ``otp.config['default_db']``. tick_type: str, optional Tick type for the query. Doesn't affect the ``src`` query. start: otp.dt, optional By default it is taken from the ``src`` start time end: otp.dt, optional By default it is taken from the ``src`` end time See also -------- **SPLIT_QUERY_OUTPUT_BY_SYMBOL** OneTick event processor Examples -------- >>> executions = otp.CSV( # doctest: +SKIP ... otp.utils.file(os.path.join(cur_dir, 'data', 'example_events.csv')), ... converters={"time_number": lambda c: c.apply(otp.nsectime)}, ... timestamp_name="time_number", ... start=otp.dt(2022, 7, 1), ... end=otp.dt(2022, 7, 2), ... order_ticks=True ... )[['stock', 'px']] >>> csv = otp.by_symbol(executions, 'stock') # doctest: +SKIP >>> trd = otp.DataSource( # doctest: +SKIP ... db='NYSE_TAQ', ... tick_type='TRD', ... start=otp.dt(2022, 7, 1), ... end=otp.dt(2022, 7, 2) ... )[['PRICE', 'SIZE']] >>> data = otp.funcs.join_by_time([csv, trd]) # doctest: +SKIP >>> result = otp.run(data, symbols=executions.distinct(keys='stock')[['stock']], concurrency=8) # doctest: +SKIP >>> result['THG'] # doctest: +SKIP Time stock px PRICE SIZE 0 2022-07-01 11:37:56.432947200 THG 148.02 146.48 1 >>> result['TFX'] # doctest: +SKIP Time stock px PRICE SIZE 0 2022-07-01 11:39:45.882808576 TFX 255.61 251.97 1 >>> result['BURL'] # doctest: +SKIP Time stock px PRICE SIZE 0 2022-07-01 11:42:35.125718016 BURL 137.53 135.41 2 """ifsingle_invocationisNone:ifisinstance(src,CSV):single_invocation=Trueelse:single_invocation=Falseresult=SplitQueryOutputBySymbol(src,symbol_field=symbol_field,single_invocation=single_invocation,db=db,tick_type=tick_type,start=start,end=end)result.schema.set(**src.schema)returnresult
[docs]@docstring(parameters=OB_SNAPSHOT_DOC_PARAMS+DATA_SOURCE_DOC_PARAMS)defObSnapshot(*args,**kwargs):""" Construct a source providing order book snapshot for a given ``db``. This is just a shorcut for otp.DataSource + otp.agg.ob_snapshot. See also -------- | :class:`onetick.py.DataSource` | :meth:`onetick.py.Source.ob_snapshot` | :func:`onetick.py.agg.ob_snapshot` | **OB_SNAPSHOT** OneTick event processor Examples --------- >>> data = otp.ObSnapshot(db='SOME_DB', tick_type='PRL', symbols='AA', max_levels=1) # doctest: +SKIP >>> otp.run(data) # doctest: +SKIP Time PRICE UPDATE_TIME SIZE LEVEL BUY_SELL_FLAG 0 2003-12-04 2.0 2003-12-01 00:00:00.003 6 1 1 1 2003-12-04 5.0 2003-12-01 00:00:00.004 7 1 0 """aggregation_params={param.name:kwargs.pop(param.name,param.default)for_,paraminOB_SNAPSHOT_DOC_PARAMS}src=otp.DataSource(*args,**kwargs)returnotp.agg.ob_snapshot(**aggregation_params).apply(src)
[docs]@docstring(parameters=OB_SNAPSHOT_WIDE_DOC_PARAMS+DATA_SOURCE_DOC_PARAMS)defObSnapshotWide(*args,**kwargs):""" Construct a source providing order book wide snapshot for a given ``db``. This is just a shorcut for otp.DataSource + otp.agg.ob_snapshot_wide. See also -------- | :class:`onetick.py.DataSource` | :meth:`onetick.py.Source.ob_snapshot_wide` | :func:`onetick.py.agg.ob_snapshot_wide` | **OB_SNAPSHOT_WIDE** OneTick event processor Examples --------- >>> data = otp.ObSnapshotWide(db='SOME_DB', tick_type='PRL', symbols='AA', max_levels=1) # doctest: +SKIP >>> otp.run(data) # doctest: +SKIP Time BID_PRICE BID_UPDATE_TIME BID_SIZE ASK_PRICE ASK_UPDATE_TIME ASK_SIZE LEVEL 0 2003-12-03 5.0 2003-12-01 00:00:00.004 7 2.0 2003-12-01 00:00:00.003 6 1 """aggregation_params={param.name:kwargs.pop(param.name,param.default)for_,paraminOB_SNAPSHOT_WIDE_DOC_PARAMS}src=otp.DataSource(*args,**kwargs)returnotp.agg.ob_snapshot_wide(**aggregation_params).apply(src)
[docs]@docstring(parameters=OB_SNAPSHOT_FLAT_DOC_PARAMS+DATA_SOURCE_DOC_PARAMS)defObSnapshotFlat(*args,**kwargs):""" Construct a source providing order book flat snapshot for a given ``db``. This is just a shorcut for otp.DataSource + otp.agg.ob_snapshot_flat. See also -------- | :class:`onetick.py.DataSource` | :meth:`onetick.py.Source.ob_snapshot_flat` | :func:`onetick.py.agg.ob_snapshot_flat` | **OB_SNAPSHOT_FLAT** OneTick event processor Examples --------- >>> data = otp.ObSnapshotFlat(db='SOME_DB', tick_type='PRL', symbols='AA', max_levels=1) # doctest: +SKIP >>> otp.run(data) # doctest: +SKIP Time BID_PRICE1 BID_UPDATE_TIME1 BID_SIZE1 ASK_PRICE1 ASK_UPDATE_TIME1 ASK_SIZE1 0 2003-12-03 5.0 2003-12-01 00:00:00.004 7 2.0 2003-12-01 00:00:00.003 6 """aggregation_params={param.name:kwargs.pop(param.name,param.default)for_,paraminOB_SNAPSHOT_FLAT_DOC_PARAMS}src=otp.DataSource(*args,**kwargs)returnotp.agg.ob_snapshot_flat(**aggregation_params).apply(src)