otp.DataSource#
- class DataSource(db=None, symbol=utils.adaptive, tick_type=utils.adaptive, start=utils.adaptive, end=utils.adaptive, date=None, schema_policy=None, guess_schema=utils.adaptive, identify_input_ts=None, back_to_first_tick=False, keep_first_tick_timestamp=0, max_back_ticks_to_prepend=None, where_clause_for_back_ticks=1, symbols=None, presort=None, batch_size=utils.adaptive, concurrency=None, schema=utils.default, symbol_date=None, **kwargs)#
Bases:
SourceConstruct a source providing data from a given
db.Warning
Default value of the parameter
schema_policyenables automatic deduction of the data schema, but it is highly not recommended for production code. For details see Schema deduction mechanism.- Parameters:
db (str, list of str,
otp.DB, default=None) –Name(s) of the database or the database object(s).
When passing a single database, the tick type can be embedded in the name using
'DB_NAME::TICK_TYPE'format (e.g.,'US_COMP::TRD').When passing a list of databases, each entry can include its own tick type (e.g.,
['US_COMP::TRD', 'CME::QTE']). If some entries lack a tick type, thetick_typeparameter is used to fill them in.When
None, the database is expected to come as part of the symbol name (e.g.,'DB::SYMBOL'), andtick_typemust be set explicitly.symbol (str, list of str,
Source,query,eval query, default=onetick.py.adaptive) – Symbol(s) from which data should be taken.tick_type (str, list of str, default=
onetick.py.adaptive) –Tick type of the data (e.g.,
'TRD'for trades,'QTE'for quotes).When
adaptive(default), the tick type is auto-detected from the database. If auto-detection fails or multiple databases are specified, defaults to'TRD'.Can be a list of strings (e.g.,
['TRD', 'QTE']) to merge multiple tick types from the same database into a single data flow.start (
datetime.datetime,otp.datetime,onetick.py.adaptive, default=onetick.py.adaptive) – Start of the interval from which the data should be taken. Default isonetick.py.adaptive, making the final query deduce the time limits from the rest of the graph.end (
datetime.datetime,otp.datetime,onetick.py.adaptive, default=onetick.py.adaptive) – End of the interval from which the data should be taken. Default isonetick.py.adaptive, making the final query deduce the time limits from the rest of the graph.date (
datetime.datetime,otp.datetime, default=None) – Allows to specify a whole day instead of passing explicitlystartandendparameters. If it is set along with thestartandendparameters then last two are ignored.schema_policy (‘tolerant’, ‘tolerant_strict’, ‘fail’, ‘fail_strict’, ‘manual’, ‘manual_strict’, default=
onetick.py.adaptive) –Schema deduction policy:
’tolerant’ (default) The resulting schema is a combination of
schemaand database schema. If the database schema can be deduced, it’s checked to be type-compatible with aschema, and ValueError is raised if checks are failed. Also, with this policy database is scanned 5 days back to find the schema. It is useful when database is misconfigured or in case of holidays.’tolerant_strict’ The resulting schema will be
schemaif it’s not empty. Otherwise, database schema is used. If the database schema can be deduced, it’s checked if it lacks fields from theschemaand it’s checked to be type-compatible with aschemaand ValueError is raised if checks are failed. Also, with this policy database is scanned 5 days back to find the schema. It is useful when database is misconfigured or in case of holidays.’fail’ The same as ‘tolerant’, but if the database schema can’t be deduced, raises an Exception.
’fail_strict’ The same as ‘tolerant_strict’, but if the database schema can’t be deduced, raises an Exception.
’manual’ The resulting schema is a combination of
schemaand database schema. Compatibility with database schema will not be checked.’manual_strict’ The resulting schema will be exactly
schema. Compatibility with database schema will not be checked. If some fields specified inschemado not exist in the database, their values will be set to some default value for a type (0 for integers, NaNs for floats, empty string for strings, epoch for datetimes).
Default value is
onetick.py.adaptive(if deprecated parameterguess_schemais not set). Ifguess_schemais set to True then value is ‘fail’, if False then ‘manual’. Ifschema_policyis set toNonethen default value is ‘tolerant’.Default value can be changed with
otp.config.default_schema_policyconfiguration parameter.If you set schema manually, while creating DataSource instance, and don’t set
schema_policy, it will be automatically set tomanual.guess_schema (bool, default=None) –
Deprecated since version 1.3.16.
Use
schema_policyparameter instead.If
guess_schemais set to True thenschema_policyvalue is ‘fail’, if False then ‘manual’.identify_input_ts (bool, default=False) – If True, adds
SYMBOL_NAMEandTICK_TYPEfields to every output tick, identifying which symbol and tick type each tick came from. Especially useful when merging multiple symbols to distinguish the source of each tick.back_to_first_tick (int, offset,
otp.expr,Operation, default=0) –Determines how far back (in seconds) to search for the latest tick before
starttime. If one is found, it is prepended to the output with its timestamp changed tostarttime. This is useful for initializing state (e.g., getting the last known price before market open).Accepts an integer (seconds), a time offset like
otp.Day(1)orotp.Hour(2), or anotp.exprfor dynamic values.Note: the value is rounded to whole seconds, so
otp.Millis(999)becomes 0. Use withkeep_first_tick_timestampto preserve the original tick time, andmax_back_ticks_to_prependto retrieve more than one historical tick.keep_first_tick_timestamp (str, default=None) –
Name for a new
nsectimefield that stores the original timestamp of prepended ticks. For ticks within the query interval, this field equals theTimefield. For ticks prepended byback_to_first_tick, it contains their true historical timestamp (before it was overwritten withstarttime).This parameter is ignored if
back_to_first_tickis 0.max_back_ticks_to_prepend (int, default=1) –
Maximum number of the most recent ticks before
starttime to prepend to the output. Only used whenback_to_first_tickis non-zero. All prepended ticks have their timestamp changed tostarttime. Must be at least 1.For example, to get the last 5 trades before market open, set
back_to_first_tick=otp.Day(1)andmax_back_ticks_to_prepend=5.where_clause_for_back_ticks (onetick.py.core.column_operations.base.Raw, default=None) –
A filter expression applied only to ticks found during the backward search (controlled by
back_to_first_tick). Ticks where this expression evaluates to False are skipped and not prepended.Must be an
otp.rawexpression withdtype=bool. For example,otp.raw('SIZE>=100', dtype=bool)keeps only ticks with SIZE >= 100.symbols (str, list of str,
Source,query,eval query,onetick.query.GraphQuery., default=None) – Symbol(s) from which data should be taken. Alias forsymbolparameter. Will take precedence over it.presort (bool, default=
onetick.py.adaptive) –Controls whether to use a PRESORT Event Processor when querying multiple bound symbols. PRESORT parallelizes data fetching across symbols and merges results in timestamp order, which is generally faster than sequential MERGE for large symbol lists.
Applicable only when
symbolsis set. By default, True whensymbolsis set, False otherwise. Set to False to use sequential MERGE instead.batch_size (int, default=None) –
Number of symbols to process in each batch during
presortexecution. Larger batch sizes reduce overhead but use more memory. Only applicable whenpresortis True.By default, the value from
otp.config.default_batch_sizeis used.concurrency (int, default=
onetick.py.utils.default) –Specifies the number of CPU cores to utilize for the
presort. By default, the value is inherited from the value of the query where this PRESORT is used.For the main query it may be specified in the
concurrencyparameter ofrun()method (which by default is set tootp.config.default_concurrency).For the auxiliary queries (like first-stage queries) empty value means OneTick’s default of 1. If
otp.config.presort_force_default_concurrencyis set then default concurrency value will be set in all PRESORT EPs in all queries.schema (Optional[Dict[str, type]], default=None) –
Dict of column name to column type pairs that the source is expected to have.
Supported types:
int,float,str,otp.string[N],otp.varstring[N],otp.nsectime,otp.msectime,otp.decimal,bytes.If the type of a column is irrelevant, provide
Noneas the type.How the schema is used depends on
schema_policy. Whenschemais set andschema_policyis not explicitly provided,schema_policydefaults to'manual'.symbol_date (
otp.datetimeordatetime.datetimeor int, default=None) –Date used for symbol resolution in date-dependent symbologies, where the same symbol identifier can map to different instruments on different dates.
Accepts
otp.datetime,datetime.datetime, or an integer in theYYYYMMDDformat (e.g.,20220301).Can only be specified when
symbolsis set. Ifsymbolsis a plain list of strings, it is internally converted to a first-stage query with the givensymbol_date.kwargs (type[str]) – Deprecated. Use
schemainstead. List of <column name> -> <column type> pairs that the source is expected to have. If the type is irrelevant, provide None as the type in question.
Note
If interval that was set for
DataSourceviastart/endordateparameters does not match intervals in otherSourceobjects used in query, or does not match the whole query interval, thenmodify_query_times()will be applied to thisDataSourcewith specified interval as start and end times parameters.If
symbolsparameter is omitted, you need to specify unbound symbols for the query insymbolsparameter ofonetick.py.run()function.If
symbolsparameter is set,otp.mergeis used to merge all passed bound symbols. In this case you don’t need to specify unbound symbols inonetick.py.run()call.It’s not allowed to specify bound and unbound symbols at the same time.
Examples
Query a single symbol from a database, specifying
dbas a string:>>> data = otp.DataSource(db='SOME_DB', tick_type='TT', symbols='S1') >>> otp.run(data) Time X 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.001 2 2 2003-12-01 00:00:00.002 3
dbcan also be anDBobject:db = otp.DB('US_COMP') data = otp.DataSource(db=db, tick_type='TRD', symbols='AAPL') otp.run(data)
dbcan be a list to merge data from multiple databases. Each entry can embed its tick type using'DB_NAME::TICK_TYPE'format:data = otp.DataSource( db=['US_COMP::TRD', 'CME::TRD'], symbols='AAPL', ) otp.run(data)
When some databases in the list lack a tick type,
tick_typefills them in:# Equivalent to db=['US_COMP::TRD', 'CME::TRD'] data = otp.DataSource( db=['US_COMP', 'CME'], tick_type='TRD', symbols='AAPL', )
Parameter
symbolscan be a list. In this case specified symbols will be merged into a single data flow:>>> data = otp.DataSource(db='SOME_DB', tick_type='TT', symbols=['S1', 'S2']) >>> otp.run(data) Time X 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.000 -3 2 2003-12-01 00:00:00.001 2 3 2003-12-01 00:00:00.001 -2 4 2003-12-01 00:00:00.002 3 5 2003-12-01 00:00:00.002 -1
Parameter
identify_input_tscan be used to automatically add field with symbol name for each tick:>>> data = otp.DataSource(db='SOME_DB', tick_type='TT', symbols=['S1', 'S2'], identify_input_ts=True) >>> otp.run(data) Time SYMBOL_NAME TICK_TYPE X 0 2003-12-01 00:00:00.000 S1 TT 1 1 2003-12-01 00:00:00.000 S2 TT -3 2 2003-12-01 00:00:00.001 S1 TT 2 3 2003-12-01 00:00:00.001 S2 TT -2 4 2003-12-01 00:00:00.002 S1 TT 3 5 2003-12-01 00:00:00.002 S2 TT -1
Source also can be passed as symbols, in such case magic named column SYMBOL_NAME will be transform to symbol and all other columns will be symbol parameters
>>> symbols = otp.Ticks(SYMBOL_NAME=['S1', 'S2']) >>> data = otp.DataSource(db='SOME_DB', symbols=symbols, tick_type='TT') >>> otp.run(data) Time X 0 2003-12-01 00:00:00.000 1 1 2003-12-01 00:00:00.000 -3 2 2003-12-01 00:00:00.001 2 3 2003-12-01 00:00:00.001 -2 4 2003-12-01 00:00:00.002 3 5 2003-12-01 00:00:00.002 -1
Use
dateto query a full day of data. It setsstartto the beginning of the day andendto the last millisecond of the day:>>> data = otp.DataSource(db='US_COMP', tick_type='TRD', symbols='AAPL', date=otp.dt(2022, 3, 1)) >>> otp.run(data) Time PRICE SIZE 0 2022-03-01 00:00:00.000 1.3 100 1 2022-03-01 00:00:00.001 1.4 10 2 2022-03-01 00:00:00.002 1.4 50
Alternatively, use
startandendfor a custom time interval. Standarddatetime.datetimeobjects are also accepted:import datetime data = otp.DataSource( db='US_COMP', tick_type='TRD', symbols='AAPL', start=datetime.datetime(2022, 3, 1, 9, 30), end=datetime.datetime(2022, 3, 1, 16, 0), ) otp.run(data)
Or using
otp.datetime:data = otp.DataSource( db='US_COMP', tick_type='TRD', symbols='AAPL', start=otp.dt(2022, 3, 1, 9, 30), end=otp.dt(2022, 3, 1, 16, 0), )
If
dateis set together withstart/end,datetakes precedence.tick_typecan be a list to merge data from multiple tick types:# Merge trades and quotes from the same database data = otp.DataSource( db='US_COMP', tick_type=['TRD', 'QTE'], symbols='AAPL', identify_input_ts=True, ) # Use identify_input_ts=True to tell which tick type each row came from
Default schema policy is tolerant (unless you specified
schemaparameter and leftschema_policywith default value, when it will be set to manual).>>> data = otp.DataSource( ... db='US_COMP', tick_type='TRD', symbols='AAPL', date=otp.dt(2022, 3, 1), ... ) >>> data.schema {'PRICE': <class 'float'>, 'SIZE': <class 'int'>}
>>> data = otp.DataSource( ... db='US_COMP', tick_type='TRD', symbols='AAPL', schema={'PRICE': int}, ... schema_policy='tolerant', date=otp.dt(2022, 3, 1), ... ) Traceback (most recent call last): ... ValueError: Database(-s) US_COMP::TRD schema field PRICE has type <class 'float'>, but <class 'int'> was requested
Schema policy manual uses exactly
schema:>>> data = otp.DataSource(db='US_COMP', tick_type='TRD', symbols='AAPL', schema={'PRICE': float}, ... date=otp.dt(2022, 3, 1), schema_policy='manual') >>> data.schema {'PRICE': <class 'float'>}
Schema policy fail raises an exception if the schema cannot be deduced:
>>> data = otp.DataSource(db='US_COMP', tick_type='TRD', symbols='AAPL', date=otp.dt(2021, 3, 1), ... schema_policy='fail') Traceback (most recent call last): ... ValueError: No ticks found in database(-s) US_COMP::TRD
Schema policy tolerant_strict uses
schemaif provided, otherwise falls back to the database schema. It still validates type compatibility:>>> data = otp.DataSource(db='US_COMP', tick_type='TRD', symbols='AAPL', ... schema={'PRICE': float}, date=otp.dt(2022, 3, 1), ... schema_policy='tolerant_strict') >>> data.schema {'PRICE': <class 'float'>}
Schema policy manual_strict uses exactly the provided
schema, no database schema is consulted. Fields that don’t exist in the database get default values (0 forint, NaN forfloat, empty string forstr):data = otp.DataSource( db='US_COMP', tick_type='TRD', symbols='AAPL', schema={'PRICE': float, 'CUSTOM_FLAG': int}, schema_policy='manual_strict', ) # CUSTOM_FLAG will be 0 for all ticks since it doesn't exist in the database
Schema policy fail_strict is like
tolerant_strictbut raises an exception when the database schema cannot be determined.The
schemaparameter accepts various types. UseNonefor columns whose type is irrelevant:data = otp.DataSource( db='US_COMP', tick_type='TRD', symbols='AAPL', schema={ 'PRICE': float, # 64-bit float 'SIZE': int, # 64-bit integer 'EXCHANGE': str, # string (default length 64) 'COND': otp.string[4], # fixed-length string of 4 chars 'MEMO': otp.varstring[256], # variable-length string up to 256 chars 'TRADE_TIME': otp.nsectime, # nanosecond-precision timestamp 'OTHER': None, # type will be inferred from database }, )
back_to_first_ticksets how far back to go looking for the latest tick beforestarttime:>>> data = otp.DataSource(db='US_COMP', tick_type='TRD', symbols='AAPL', date=otp.dt(2022, 3, 2), ... back_to_first_tick=otp.Day(1)) >>> otp.run(data) Time PRICE SIZE 0 2022-03-02 00:00:00.000 1.4 50 1 2022-03-02 00:00:00.000 1.0 100 2 2022-03-02 00:00:00.001 1.1 101 3 2022-03-02 00:00:00.002 1.2 102
keep_first_tick_timestampallows to show the original timestamp of the tick that was taken from before the start time of the query:>>> data = otp.DataSource(db='US_COMP', tick_type='TRD', symbols='AAPL', date=otp.dt(2022, 3, 2), ... back_to_first_tick=otp.Day(1), keep_first_tick_timestamp='ORIGIN_TIMESTAMP') >>> otp.run(data) Time ORIGIN_TIMESTAMP PRICE SIZE 0 2022-03-02 00:00:00.000 2022-03-01 00:00:00.002 1.4 50 1 2022-03-02 00:00:00.000 2022-03-02 00:00:00.000 1.0 100 2 2022-03-02 00:00:00.001 2022-03-02 00:00:00.001 1.1 101 3 2022-03-02 00:00:00.002 2022-03-02 00:00:00.002 1.2 102
max_back_ticks_to_prependis used withback_to_first_tickif more than 1 ticks before start time should be retrieved:>>> data = otp.DataSource(db='US_COMP', tick_type='TRD', symbols='AAPL', date=otp.dt(2022, 3, 2), ... max_back_ticks_to_prepend=2, back_to_first_tick=otp.Day(1), ... keep_first_tick_timestamp='ORIGIN_TIMESTAMP') >>> otp.run(data) Time ORIGIN_TIMESTAMP PRICE SIZE 0 2022-03-02 00:00:00.000 2022-03-01 00:00:00.001 1.4 10 1 2022-03-02 00:00:00.000 2022-03-01 00:00:00.002 1.4 50 2 2022-03-02 00:00:00.000 2022-03-02 00:00:00.000 1.0 100 3 2022-03-02 00:00:00.001 2022-03-02 00:00:00.001 1.1 101 4 2022-03-02 00:00:00.002 2022-03-02 00:00:00.002 1.2 102
where_clause_for_back_ticksis used to filter out ticks before the start time:data = otp.DataSource(db='US_COMP', tick_type='TRD', symbols='AAPL', date=otp.dt(2022, 3, 2), where_clause_for_back_ticks=otp.raw('SIZE>=50', dtype=bool), back_to_first_tick=otp.Day(1), max_back_ticks_to_prepend=2, keep_first_tick_timestamp='ORIGIN_TIMESTAMP') df = otp.run(data) print(df)
Time ORIGIN_TIMESTAMP PRICE SIZE 0 2022-03-02 00:00:00.000 2022-03-01 00:00:00.000 1.3 100 1 2022-03-02 00:00:00.000 2022-03-01 00:00:00.002 1.4 50 2 2022-03-02 00:00:00.000 2022-03-02 00:00:00.000 1.0 100 3 2022-03-02 00:00:00.001 2022-03-02 00:00:00.001 1.1 101 4 2022-03-02 00:00:00.002 2022-03-02 00:00:00.002 1.2 102
presortcontrols whether to use parallel data fetching when querying multiple bound symbols. It defaults to True whensymbolsis set. Set to False to use sequential MERGE instead (useful for debugging or when order must be strictly preserved):# With presort (default) - parallel fetching, faster for many symbols data = otp.DataSource( db='US_COMP', tick_type='TRD', symbols=['AAPL', 'MSFT', 'GOOGL'], presort=True, # this is the default when symbols is set ) # Without presort - sequential merge data = otp.DataSource( db='US_COMP', tick_type='TRD', symbols=['AAPL', 'MSFT', 'GOOGL'], presort=False, )
batch_sizeandconcurrencytune presort performance.batch_sizecontrols how many symbols are processed per batch, andconcurrencysets the number of CPU cores:data = otp.DataSource( db='US_COMP', tick_type='TRD', symbols=large_symbol_list, # e.g., 500+ symbols batch_size=50, # process 50 symbols at a time concurrency=4, # use 4 CPU cores )
symbol_datespecifies the date for resolving symbols in date-dependent symbologies. It is only applicable whensymbolsis set:data = otp.DataSource( db='US_COMP', tick_type='TRD', symbols=['AAPL', 'MSFT'], symbol_date=otp.dt(2022, 3, 1), ) # symbol_date also accepts integers in YYYYMMDD format data = otp.DataSource( db='US_COMP', tick_type='TRD', symbols=['AAPL', 'MSFT'], symbol_date=20220301, )