otp.DataSource#

class DataSource(db=None, symbol=utils.adaptive, tick_type=utils.adaptive, start=utils.adaptive, end=utils.adaptive, date=None, schema_policy=None, guess_schema=utils.adaptive, identify_input_ts=None, back_to_first_tick=False, keep_first_tick_timestamp=0, max_back_ticks_to_prepend=None, where_clause_for_back_ticks=1, symbols=None, presort=None, batch_size=utils.adaptive, concurrency=None, schema=utils.default, **kwargs)#

Bases: onetick.py.core.source.Source

Construct a source providing data from a given db.

Warning

Default value of the parameter schema_policy enables automatic deduction of the data schema, but it is highly not recommended for production code. For details see Schema deduction mechanism.

Parameters
  • db (str, list of str, otp.DB, default=None) – Name(s) of the database or the database object(s).

  • symbol (str, list of str, Source, query, eval query, default= onetick.py.adaptive) – Symbol(s) from which data should be taken.

  • tick_type (str, list of str, default= onetick.py.adaptive) – Tick type of the data. If not specified, all ticks from db will be taken. If ticks can’t be found or there are many databases specified in db then default is “TRD”.

  • start (datetime.datetime, otp.datetime, onetick.py.adaptive, default= onetick.py.adaptive) – Start of the interval from which the data should be taken. Default is onetick.py.adaptive, making the final query deduce the time limits from the rest of the graph.

  • end (datetime.datetime, otp.datetime, onetick.py.adaptive, default= onetick.py.adaptive) – End of the interval from which the data should be taken. Default is onetick.py.adaptive, making the final query deduce the time limits from the rest of the graph.

  • date (datetime.datetime, otp.datetime, default=None) – Allows to specify a whole day instead of passing explicitly start and end parameters. If it is set along with the start and end parameters then last two are ignored.

  • schema_policy (‘tolerant’, ‘tolerant_strict’, ‘fail’, ‘fail_strict’, ‘manual’, ‘manual_strict’, default= onetick.py.adaptive) –

    Schema deduction policy:

    • ’tolerant’ (default) The resulting schema is a combination of schema and database schema. If the database schema can be deduced, it’s checked to be type-compatible with a schema, and ValueError is raised if checks are failed. Also, with this policy database is scanned 5 days back to find the schema. It is useful when database is misconfigured or in case of holidays.

    • ’tolerant_strict’ The resulting schema will be schema if it’s not empty. Otherwise, database schema is used. If the database schema can be deduced, it’s checked if it lacks fields from the schema and it’s checked to be type-compatible with a schema and ValueError is raised if checks are failed. Also, with this policy database is scanned 5 days back to find the schema. It is useful when database is misconfigured or in case of holidays.

    • ’fail’ The same as ‘tolerant’, but if the database schema can’t be deduced, raises an Exception.

    • ’fail_strict’ The same as ‘tolerant_strict’, but if the database schema can’t be deduced, raises an Exception.

    • ’manual’ The resulting schema is a combination of schema and database schema. Compatibility with database schema will not be checked.

    • ’manual_strict’ The resulting schema will be exactly schema. Compatibility with database schema will not be checked. If some fields specified in schema do not exist in the database, their values will be set to some default value for a type (0 for integers, NaNs for floats, empty string for strings, epoch for datetimes).

    Default value is ‘tolerant’ (if deprecated parameter guess_schema is not set). If guess_schema is set to True then value is ‘fail’, if False then ‘manual’.

    Default value can be changed with otp.config.default_schema_policy configuration parameter.

  • guess_schema (bool, default=None) –

    Deprecated since version 1.3.16.

    Use schema_policy parameter instead.

    If guess_schema is set to True then schema_policy value is ‘fail’, if False then ‘manual’.

  • identify_input_ts (bool, default=False) – If set to False, the fields SYMBOL_NAME and TICK_TYPE are not appended to the output ticks.

  • back_to_first_tick (int, offset, otp.expr, Operation, default=0) – Determines how far back to go looking for the latest tick before start time. If one is found, it is inserted into the output time series with the timestamp set to start time. Note: it will be rounded to int, so otp.Millis(999) will be 0 seconds.

  • keep_first_tick_timestamp (str, default=None) – If set, new field with this name will be added to source. This field contains original timestamp of the tick that was taken from before the start time of the query. For all other ticks value in this field will be equal to the value of Time field. This parameter is ignored if back_to_first_tick is not set.

  • max_back_ticks_to_prepend (int, default=1) – When the back_to_first_tick interval is specified, this parameter determines the maximum number of the most recent ticks before start_time that will be prepended to the output time series. Their timestamp will be changed to start_time.

  • where_clause_for_back_ticks (onetick.py.core.column_operations.base.Raw, default=None) – A logical expression that is computed only for the ticks encountered when a query goes back from the start time, in search of the ticks to prepend. If it returns false, a tick is ignored.

  • symbols (str, list of str, Source, query, eval query, onetick.query.GraphQuery., default=None) – Symbol(s) from which data should be taken. Alias for symbol parameter. Will take precedence over it.

  • presort (bool, default= onetick.py.adaptive) – Add the presort EP in case of bound symbols. Applicable only when symbols is not None. By default, it is set to True if symbols are set and to False otherwise.

  • batch_size (int, default=None) – Specifies the query batch size for the presort. By default, the value from otp.config.default_batch_size is used.

  • concurrency (int, default= onetick.py.utils.default) – Specifies number of CPU cores to utilize for the presort By default, the value is inherited from the value of original query specified in the concurrency parameter of run() method (which by default is set to otp.config.default_concurrency).

  • schema (Optional[Dict[str, type]], default=None) – Dict of <column name> -> <column type> pairs that the source is expected to have. If the type is irrelevant, provide None as the type in question.

  • kwargs (type[str]) – Deprecated. Use schema instead. List of <column name> -> <column type> pairs that the source is expected to have. If the type is irrelevant, provide None as the type in question.

Note

If interval that was set for DataSource via start/end or date parameters does not match intervals in other Source objects used in query, or does not match the whole query interval, then modify_query_times() will be applied to this DataSource with specified interval as start and end times parameters.

If symbols parameter is omitted, you need to specify unbound symbols for the query in symbols parameter of onetick.py.run() function.

If symbols parameter is set, otp.merge is used to merge all passed bound symbols. In this case you don’t need to specify unbound symbols in onetick.py.run() call.

It’s not allowed to specify bound and unbound symbols at the same time.

Examples

Query a single symbol from a database:

>>> data = otp.DataSource(db='SOME_DB', tick_type='TT', symbols='S1')
>>> otp.run(data)
                     Time  X
0 2003-12-01 00:00:00.000  1
1 2003-12-01 00:00:00.001  2
2 2003-12-01 00:00:00.002  3

Parameter symbols can be a list. In this case specified symbols will be merged into a single data flow:

>>> data = otp.DataSource(db='SOME_DB', tick_type='TT', symbols=['S1', 'S2'])
>>> otp.run(data)
                     Time  X
0 2003-12-01 00:00:00.000  1
1 2003-12-01 00:00:00.000 -3
2 2003-12-01 00:00:00.001  2
3 2003-12-01 00:00:00.001 -2
4 2003-12-01 00:00:00.002  3
5 2003-12-01 00:00:00.002 -1

Parameter identify_input_ts can be used to automatically add field with symbol name for each tick:

>>> data = otp.DataSource(db='SOME_DB', tick_type='TT', symbols=['S1', 'S2'], identify_input_ts=True)
>>> otp.run(data)
                     Time SYMBOL_NAME TICK_TYPE  X
0 2003-12-01 00:00:00.000          S1        TT  1
1 2003-12-01 00:00:00.000          S2        TT -3
2 2003-12-01 00:00:00.001          S1        TT  2
3 2003-12-01 00:00:00.001          S2        TT -2
4 2003-12-01 00:00:00.002          S1        TT  3
5 2003-12-01 00:00:00.002          S2        TT -1

Source also can be passed as symbols, in such case magic named column SYMBOL_NAME will be transform to symbol and all other columns will be symbol parameters

>>> symbols = otp.Ticks(SYMBOL_NAME=['S1', 'S2'])
>>> data = otp.DataSource(db='SOME_DB', symbols=symbols, tick_type='TT')
>>> otp.run(data)
                     Time  X
0 2003-12-01 00:00:00.000  1
1 2003-12-01 00:00:00.000 -3
2 2003-12-01 00:00:00.001  2
3 2003-12-01 00:00:00.001 -2
4 2003-12-01 00:00:00.002  3
5 2003-12-01 00:00:00.002 -1

Default schema policy is tolerant.

>>> data = otp.DataSource(
...     db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', schema={'PRICE': float}, date=otp.dt(2022, 3, 1),
... )
>>> data.schema
{'PRICE': <class 'float'>, 'SIZE': <class 'int'>}
>>> data = otp.DataSource(
...     db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', schema={'PRICE': int}, date=otp.dt(2022, 3, 1),
... )
Traceback (most recent call last):
  ...
ValueError: Database(-s) NYSE_TAQ::TRD schema field PRICE has type <class 'float'>,
but <class 'int'> was requested

Schema policy manual uses exactly schema:

>>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', schema={'PRICE': float},
...                       date=otp.dt(2022, 3, 1), schema_policy='manual')
>>> data.schema
{'PRICE': <class 'float'>}

Schema policy fail raises an exception if the schema cannot be deduced:

>>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', date=otp.dt(2021, 3, 1),
...                       schema_policy='fail')
Traceback (most recent call last):
  ...
ValueError: No ticks found in database(-s) NYSE_TAQ::TRD

back_to_first_tick sets how far back to go looking for the latest tick before start time:

>>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', date=otp.dt(2022, 3, 2),
...                       back_to_first_tick=otp.Day(1))
>>> otp.run(data)
                     Time  PRICE  SIZE
0 2022-03-02 00:00:00.000    1.4    50
1 2022-03-02 00:00:00.000    1.0   100
2 2022-03-02 00:00:00.001    1.1   101
3 2022-03-02 00:00:00.002    1.2   102

keep_first_tick_timestamp allows to show the original timestamp of the tick that was taken from before the start time of the query:

>>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', date=otp.dt(2022, 3, 2),
...                       back_to_first_tick=otp.Day(1), keep_first_tick_timestamp='ORIGIN_TIMESTAMP')
>>> otp.run(data)
                     Time         ORIGIN_TIMESTAMP  PRICE  SIZE
0 2022-03-02 00:00:00.000  2022-03-01 00:00:00.002    1.4    50
1 2022-03-02 00:00:00.000  2022-03-02 00:00:00.000    1.0   100
2 2022-03-02 00:00:00.001  2022-03-02 00:00:00.001    1.1   101
3 2022-03-02 00:00:00.002  2022-03-02 00:00:00.002    1.2   102

max_back_ticks_to_prepend is used with back_to_first_tick if more than 1 ticks before start time should be retrieved:

>>> data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', date=otp.dt(2022, 3, 2),
...                       max_back_ticks_to_prepend=2, back_to_first_tick=otp.Day(1),
...                       keep_first_tick_timestamp='ORIGIN_TIMESTAMP')
>>> otp.run(data)
                     Time         ORIGIN_TIMESTAMP  PRICE  SIZE
0 2022-03-02 00:00:00.000  2022-03-01 00:00:00.001    1.4    10
1 2022-03-02 00:00:00.000  2022-03-01 00:00:00.002    1.4    50
2 2022-03-02 00:00:00.000  2022-03-02 00:00:00.000    1.0   100
3 2022-03-02 00:00:00.001  2022-03-02 00:00:00.001    1.1   101
4 2022-03-02 00:00:00.002  2022-03-02 00:00:00.002    1.2   102

where_clause_for_back_ticks is used to filter out ticks before the start time:

data = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', symbols='AAPL', date=otp.dt(2022, 3, 2),
                      where_clause_for_back_ticks=otp.raw('SIZE>=50', dtype=bool),
                      back_to_first_tick=otp.Day(1), max_back_ticks_to_prepend=2,
                      keep_first_tick_timestamp='ORIGIN_TIMESTAMP')
df = otp.run(data)
print(df)
                     Time         ORIGIN_TIMESTAMP  PRICE  SIZE
0 2022-03-02 00:00:00.000  2022-03-01 00:00:00.000    1.3   100
1 2022-03-02 00:00:00.000  2022-03-01 00:00:00.002    1.4    50
2 2022-03-02 00:00:00.000  2022-03-02 00:00:00.000    1.0   100
3 2022-03-02 00:00:00.001  2022-03-02 00:00:00.001    1.1   101
4 2022-03-02 00:00:00.002  2022-03-02 00:00:00.002    1.2   102