otp.DataSource#

class DataSource(db=None, symbol=utils.adaptive, tick_type=utils.adaptive, start=utils.adaptive, end=utils.adaptive, date=None, schema_policy=None, guess_schema=utils.adaptive, identify_input_ts=None, back_to_first_tick=False, keep_first_tick_timestamp=0, max_back_ticks_to_prepend=None, where_clause_for_back_ticks=1, symbols=None, presort=None, batch_size=utils.adaptive, concurrency=None, schema=utils.default, symbol_date=None, **kwargs)#

Bases: Source

Construct a source providing data from a given db.

Warning

Default value of the parameter schema_policy enables automatic deduction of the data schema, but it is highly not recommended for production code. For details see Schema deduction mechanism.

Parameters:
  • db (str, list of str, otp.DB, default=None) –

    Name(s) of the database or the database object(s).

    When passing a single database, the tick type can be embedded in the name using 'DB_NAME::TICK_TYPE' format (e.g., 'US_COMP::TRD').

    When passing a list of databases, each entry can include its own tick type (e.g., ['US_COMP::TRD', 'CME::QTE']). If some entries lack a tick type, the tick_type parameter is used to fill them in.

    When None, the database is expected to come as part of the symbol name (e.g., 'DB::SYMBOL'), and tick_type must be set explicitly.

  • symbol (str, list of str, Source, query, eval query, default= onetick.py.adaptive) – Symbol(s) from which data should be taken.

  • tick_type (str, list of str, default= onetick.py.adaptive) –

    Tick type of the data (e.g., 'TRD' for trades, 'QTE' for quotes).

    When adaptive (default), the tick type is auto-detected from the database. If auto-detection fails or multiple databases are specified, defaults to 'TRD'.

    Can be a list of strings (e.g., ['TRD', 'QTE']) to merge multiple tick types from the same database into a single data flow.

  • start (datetime.datetime, otp.datetime, onetick.py.adaptive, default= onetick.py.adaptive) – Start of the interval from which the data should be taken. Default is onetick.py.adaptive, making the final query deduce the time limits from the rest of the graph.

  • end (datetime.datetime, otp.datetime, onetick.py.adaptive, default= onetick.py.adaptive) – End of the interval from which the data should be taken. Default is onetick.py.adaptive, making the final query deduce the time limits from the rest of the graph.

  • date (datetime.datetime, otp.datetime, default=None) – Allows to specify a whole day instead of passing explicitly start and end parameters. If it is set along with the start and end parameters then last two are ignored.

  • schema_policy (‘tolerant’, ‘tolerant_strict’, ‘fail’, ‘fail_strict’, ‘manual’, ‘manual_strict’, default= onetick.py.adaptive) –

    Schema deduction policy:

    • ’tolerant’ (default) The resulting schema is a combination of schema and database schema. If the database schema can be deduced, it’s checked to be type-compatible with a schema, and ValueError is raised if checks are failed. Also, with this policy database is scanned 5 days back to find the schema. It is useful when database is misconfigured or in case of holidays.

    • ’tolerant_strict’ The resulting schema will be schema if it’s not empty. Otherwise, database schema is used. If the database schema can be deduced, it’s checked if it lacks fields from the schema and it’s checked to be type-compatible with a schema and ValueError is raised if checks are failed. Also, with this policy database is scanned 5 days back to find the schema. It is useful when database is misconfigured or in case of holidays.

    • ’fail’ The same as ‘tolerant’, but if the database schema can’t be deduced, raises an Exception.

    • ’fail_strict’ The same as ‘tolerant_strict’, but if the database schema can’t be deduced, raises an Exception.

    • ’manual’ The resulting schema is a combination of schema and database schema. Compatibility with database schema will not be checked.

    • ’manual_strict’ The resulting schema will be exactly schema. Compatibility with database schema will not be checked. If some fields specified in schema do not exist in the database, their values will be set to some default value for a type (0 for integers, NaNs for floats, empty string for strings, epoch for datetimes).

    Default value is onetick.py.adaptive (if deprecated parameter guess_schema is not set). If guess_schema is set to True then value is ‘fail’, if False then ‘manual’. If schema_policy is set to None then default value is ‘tolerant’.

    Default value can be changed with otp.config.default_schema_policy configuration parameter.

    If you set schema manually, while creating DataSource instance, and don’t set schema_policy, it will be automatically set to manual.

  • guess_schema (bool, default=None) –

    Deprecated since version 1.3.16.

    Use schema_policy parameter instead.

    If guess_schema is set to True then schema_policy value is ‘fail’, if False then ‘manual’.

  • identify_input_ts (bool, default=False) – If True, adds SYMBOL_NAME and TICK_TYPE fields to every output tick, identifying which symbol and tick type each tick came from. Especially useful when merging multiple symbols to distinguish the source of each tick.

  • back_to_first_tick (int, offset, otp.expr, Operation, default=0) –

    Determines how far back (in seconds) to search for the latest tick before start time. If one is found, it is prepended to the output with its timestamp changed to start time. This is useful for initializing state (e.g., getting the last known price before market open).

    Accepts an integer (seconds), a time offset like otp.Day(1) or otp.Hour(2), or an otp.expr for dynamic values.

    Note: the value is rounded to whole seconds, so otp.Millis(999) becomes 0. Use with keep_first_tick_timestamp to preserve the original tick time, and max_back_ticks_to_prepend to retrieve more than one historical tick.

  • keep_first_tick_timestamp (str, default=None) –

    Name for a new nsectime field that stores the original timestamp of prepended ticks. For ticks within the query interval, this field equals the Time field. For ticks prepended by back_to_first_tick, it contains their true historical timestamp (before it was overwritten with start time).

    This parameter is ignored if back_to_first_tick is 0.

  • max_back_ticks_to_prepend (int, default=1) –

    Maximum number of the most recent ticks before start time to prepend to the output. Only used when back_to_first_tick is non-zero. All prepended ticks have their timestamp changed to start time. Must be at least 1.

    For example, to get the last 5 trades before market open, set back_to_first_tick=otp.Day(1) and max_back_ticks_to_prepend=5.

  • where_clause_for_back_ticks (onetick.py.core.column_operations.base.Raw, default=None) –

    A filter expression applied only to ticks found during the backward search (controlled by back_to_first_tick). Ticks where this expression evaluates to False are skipped and not prepended.

    Must be an otp.raw expression with dtype=bool. For example, otp.raw('SIZE>=100', dtype=bool) keeps only ticks with SIZE >= 100.

  • symbols (str, list of str, Source, query, eval query, onetick.query.GraphQuery., default=None) – Symbol(s) from which data should be taken. Alias for symbol parameter. Will take precedence over it.

  • presort (bool, default= onetick.py.adaptive) –

    Controls whether to use a PRESORT Event Processor when querying multiple bound symbols. PRESORT parallelizes data fetching across symbols and merges results in timestamp order, which is generally faster than sequential MERGE for large symbol lists.

    Applicable only when symbols is set. By default, True when symbols is set, False otherwise. Set to False to use sequential MERGE instead.

  • batch_size (int, default=None) –

    Number of symbols to process in each batch during presort execution. Larger batch sizes reduce overhead but use more memory. Only applicable when presort is True.

    By default, the value from otp.config.default_batch_size is used.

  • concurrency (int, default= onetick.py.utils.default) –

    Specifies the number of CPU cores to utilize for the presort. By default, the value is inherited from the value of the query where this PRESORT is used.

    For the main query it may be specified in the concurrency parameter of run() method (which by default is set to otp.config.default_concurrency).

    For the auxiliary queries (like first-stage queries) empty value means OneTick’s default of 1. If otp.config.presort_force_default_concurrency is set then default concurrency value will be set in all PRESORT EPs in all queries.

  • schema (Optional[Dict[str, type]], default=None) –

    Dict of column name to column type pairs that the source is expected to have.

    Supported types: int, float, str, otp.string[N], otp.varstring[N], otp.nsectime, otp.msectime, otp.decimal, bytes.

    If the type of a column is irrelevant, provide None as the type.

    How the schema is used depends on schema_policy. When schema is set and schema_policy is not explicitly provided, schema_policy defaults to 'manual'.

  • symbol_date (otp.datetime or datetime.datetime or int, default=None) –

    Date used for symbol resolution in date-dependent symbologies, where the same symbol identifier can map to different instruments on different dates.

    Accepts otp.datetime, datetime.datetime, or an integer in the YYYYMMDD format (e.g., 20220301).

    Can only be specified when symbols is set. If symbols is a plain list of strings, it is internally converted to a first-stage query with the given symbol_date.

  • kwargs (type[str]) – Deprecated. Use schema instead. List of <column name> -> <column type> pairs that the source is expected to have. If the type is irrelevant, provide None as the type in question.

Note

If interval that was set for DataSource via start/end or date parameters does not match intervals in other Source objects used in query, or does not match the whole query interval, then modify_query_times() will be applied to this DataSource with specified interval as start and end times parameters.

If symbols parameter is omitted, you need to specify unbound symbols for the query in symbols parameter of onetick.py.run() function.

If symbols parameter is set, otp.merge is used to merge all passed bound symbols. In this case you don’t need to specify unbound symbols in onetick.py.run() call.

It’s not allowed to specify bound and unbound symbols at the same time.

Examples

Query a single symbol from a database, specifying db as a string:

>>> data = otp.DataSource(db='SOME_DB', tick_type='TT', symbols='S1')
>>> otp.run(data)
                     Time  X
0 2003-12-01 00:00:00.000  1
1 2003-12-01 00:00:00.001  2
2 2003-12-01 00:00:00.002  3

db can also be an DB object:

db = otp.DB('US_COMP')
data = otp.DataSource(db=db, tick_type='TRD', symbols='AAPL')
otp.run(data)

db can be a list to merge data from multiple databases. Each entry can embed its tick type using 'DB_NAME::TICK_TYPE' format:

data = otp.DataSource(
    db=['US_COMP::TRD', 'CME::TRD'],
    symbols='AAPL',
)
otp.run(data)

When some databases in the list lack a tick type, tick_type fills them in:

# Equivalent to db=['US_COMP::TRD', 'CME::TRD']
data = otp.DataSource(
    db=['US_COMP', 'CME'],
    tick_type='TRD',
    symbols='AAPL',
)

Parameter symbols can be a list. In this case specified symbols will be merged into a single data flow:

>>> data = otp.DataSource(db='SOME_DB', tick_type='TT', symbols=['S1', 'S2'])
>>> otp.run(data)
                     Time  X
0 2003-12-01 00:00:00.000  1
1 2003-12-01 00:00:00.000 -3
2 2003-12-01 00:00:00.001  2
3 2003-12-01 00:00:00.001 -2
4 2003-12-01 00:00:00.002  3
5 2003-12-01 00:00:00.002 -1

Parameter identify_input_ts can be used to automatically add field with symbol name for each tick:

>>> data = otp.DataSource(db='SOME_DB', tick_type='TT', symbols=['S1', 'S2'], identify_input_ts=True)
>>> otp.run(data)
                     Time SYMBOL_NAME TICK_TYPE  X
0 2003-12-01 00:00:00.000          S1        TT  1
1 2003-12-01 00:00:00.000          S2        TT -3
2 2003-12-01 00:00:00.001          S1        TT  2
3 2003-12-01 00:00:00.001          S2        TT -2
4 2003-12-01 00:00:00.002          S1        TT  3
5 2003-12-01 00:00:00.002          S2        TT -1

Source also can be passed as symbols, in such case magic named column SYMBOL_NAME will be transform to symbol and all other columns will be symbol parameters

>>> symbols = otp.Ticks(SYMBOL_NAME=['S1', 'S2'])
>>> data = otp.DataSource(db='SOME_DB', symbols=symbols, tick_type='TT')
>>> otp.run(data)
                     Time  X
0 2003-12-01 00:00:00.000  1
1 2003-12-01 00:00:00.000 -3
2 2003-12-01 00:00:00.001  2
3 2003-12-01 00:00:00.001 -2
4 2003-12-01 00:00:00.002  3
5 2003-12-01 00:00:00.002 -1

Use date to query a full day of data. It sets start to the beginning of the day and end to the last millisecond of the day:

>>> data = otp.DataSource(db='US_COMP', tick_type='TRD', symbols='AAPL', date=otp.dt(2022, 3, 1))
>>> otp.run(data)
                     Time  PRICE  SIZE
0 2022-03-01 00:00:00.000    1.3   100
1 2022-03-01 00:00:00.001    1.4    10
2 2022-03-01 00:00:00.002    1.4    50

Alternatively, use start and end for a custom time interval. Standard datetime.datetime objects are also accepted:

import datetime
data = otp.DataSource(
    db='US_COMP', tick_type='TRD', symbols='AAPL',
    start=datetime.datetime(2022, 3, 1, 9, 30),
    end=datetime.datetime(2022, 3, 1, 16, 0),
)
otp.run(data)

Or using otp.datetime:

data = otp.DataSource(
    db='US_COMP', tick_type='TRD', symbols='AAPL',
    start=otp.dt(2022, 3, 1, 9, 30),
    end=otp.dt(2022, 3, 1, 16, 0),
)

If date is set together with start/end, date takes precedence.

tick_type can be a list to merge data from multiple tick types:

# Merge trades and quotes from the same database
data = otp.DataSource(
    db='US_COMP', tick_type=['TRD', 'QTE'],
    symbols='AAPL', identify_input_ts=True,
)
# Use identify_input_ts=True to tell which tick type each row came from

Default schema policy is tolerant (unless you specified schema parameter and left schema_policy with default value, when it will be set to manual).

>>> data = otp.DataSource(
...     db='US_COMP', tick_type='TRD', symbols='AAPL', date=otp.dt(2022, 3, 1),
... )
>>> data.schema
{'PRICE': <class 'float'>, 'SIZE': <class 'int'>}
>>> data = otp.DataSource(
...     db='US_COMP', tick_type='TRD', symbols='AAPL', schema={'PRICE': int},
...     schema_policy='tolerant', date=otp.dt(2022, 3, 1),
... )
Traceback (most recent call last):
  ...
ValueError: Database(-s) US_COMP::TRD schema field PRICE has type <class 'float'>,
but <class 'int'> was requested

Schema policy manual uses exactly schema:

>>> data = otp.DataSource(db='US_COMP', tick_type='TRD', symbols='AAPL', schema={'PRICE': float},
...                       date=otp.dt(2022, 3, 1), schema_policy='manual')
>>> data.schema
{'PRICE': <class 'float'>}

Schema policy fail raises an exception if the schema cannot be deduced:

>>> data = otp.DataSource(db='US_COMP', tick_type='TRD', symbols='AAPL', date=otp.dt(2021, 3, 1),
...                       schema_policy='fail')
Traceback (most recent call last):
  ...
ValueError: No ticks found in database(-s) US_COMP::TRD

Schema policy tolerant_strict uses schema if provided, otherwise falls back to the database schema. It still validates type compatibility:

>>> data = otp.DataSource(db='US_COMP', tick_type='TRD', symbols='AAPL',
...                       schema={'PRICE': float}, date=otp.dt(2022, 3, 1),
...                       schema_policy='tolerant_strict')
>>> data.schema
{'PRICE': <class 'float'>}

Schema policy manual_strict uses exactly the provided schema, no database schema is consulted. Fields that don’t exist in the database get default values (0 for int, NaN for float, empty string for str):

data = otp.DataSource(
    db='US_COMP', tick_type='TRD', symbols='AAPL',
    schema={'PRICE': float, 'CUSTOM_FLAG': int},
    schema_policy='manual_strict',
)
# CUSTOM_FLAG will be 0 for all ticks since it doesn't exist in the database

Schema policy fail_strict is like tolerant_strict but raises an exception when the database schema cannot be determined.

The schema parameter accepts various types. Use None for columns whose type is irrelevant:

data = otp.DataSource(
    db='US_COMP', tick_type='TRD', symbols='AAPL',
    schema={
        'PRICE': float,              # 64-bit float
        'SIZE': int,                  # 64-bit integer
        'EXCHANGE': str,              # string (default length 64)
        'COND': otp.string[4],        # fixed-length string of 4 chars
        'MEMO': otp.varstring[256],   # variable-length string up to 256 chars
        'TRADE_TIME': otp.nsectime,   # nanosecond-precision timestamp
        'OTHER': None,                # type will be inferred from database
    },
)

back_to_first_tick sets how far back to go looking for the latest tick before start time:

>>> data = otp.DataSource(db='US_COMP', tick_type='TRD', symbols='AAPL', date=otp.dt(2022, 3, 2),
...                       back_to_first_tick=otp.Day(1))
>>> otp.run(data)
                     Time  PRICE  SIZE
0 2022-03-02 00:00:00.000    1.4    50
1 2022-03-02 00:00:00.000    1.0   100
2 2022-03-02 00:00:00.001    1.1   101
3 2022-03-02 00:00:00.002    1.2   102

keep_first_tick_timestamp allows to show the original timestamp of the tick that was taken from before the start time of the query:

>>> data = otp.DataSource(db='US_COMP', tick_type='TRD', symbols='AAPL', date=otp.dt(2022, 3, 2),
...                       back_to_first_tick=otp.Day(1), keep_first_tick_timestamp='ORIGIN_TIMESTAMP')
>>> otp.run(data)
                     Time         ORIGIN_TIMESTAMP  PRICE  SIZE
0 2022-03-02 00:00:00.000  2022-03-01 00:00:00.002    1.4    50
1 2022-03-02 00:00:00.000  2022-03-02 00:00:00.000    1.0   100
2 2022-03-02 00:00:00.001  2022-03-02 00:00:00.001    1.1   101
3 2022-03-02 00:00:00.002  2022-03-02 00:00:00.002    1.2   102

max_back_ticks_to_prepend is used with back_to_first_tick if more than 1 ticks before start time should be retrieved:

>>> data = otp.DataSource(db='US_COMP', tick_type='TRD', symbols='AAPL', date=otp.dt(2022, 3, 2),
...                       max_back_ticks_to_prepend=2, back_to_first_tick=otp.Day(1),
...                       keep_first_tick_timestamp='ORIGIN_TIMESTAMP')
>>> otp.run(data)
                     Time         ORIGIN_TIMESTAMP  PRICE  SIZE
0 2022-03-02 00:00:00.000  2022-03-01 00:00:00.001    1.4    10
1 2022-03-02 00:00:00.000  2022-03-01 00:00:00.002    1.4    50
2 2022-03-02 00:00:00.000  2022-03-02 00:00:00.000    1.0   100
3 2022-03-02 00:00:00.001  2022-03-02 00:00:00.001    1.1   101
4 2022-03-02 00:00:00.002  2022-03-02 00:00:00.002    1.2   102

where_clause_for_back_ticks is used to filter out ticks before the start time:

data = otp.DataSource(db='US_COMP', tick_type='TRD', symbols='AAPL', date=otp.dt(2022, 3, 2),
                      where_clause_for_back_ticks=otp.raw('SIZE>=50', dtype=bool),
                      back_to_first_tick=otp.Day(1), max_back_ticks_to_prepend=2,
                      keep_first_tick_timestamp='ORIGIN_TIMESTAMP')
df = otp.run(data)
print(df)
                     Time         ORIGIN_TIMESTAMP  PRICE  SIZE
0 2022-03-02 00:00:00.000  2022-03-01 00:00:00.000    1.3   100
1 2022-03-02 00:00:00.000  2022-03-01 00:00:00.002    1.4    50
2 2022-03-02 00:00:00.000  2022-03-02 00:00:00.000    1.0   100
3 2022-03-02 00:00:00.001  2022-03-02 00:00:00.001    1.1   101
4 2022-03-02 00:00:00.002  2022-03-02 00:00:00.002    1.2   102

presort controls whether to use parallel data fetching when querying multiple bound symbols. It defaults to True when symbols is set. Set to False to use sequential MERGE instead (useful for debugging or when order must be strictly preserved):

# With presort (default) - parallel fetching, faster for many symbols
data = otp.DataSource(
    db='US_COMP', tick_type='TRD',
    symbols=['AAPL', 'MSFT', 'GOOGL'],
    presort=True,  # this is the default when symbols is set
)

# Without presort - sequential merge
data = otp.DataSource(
    db='US_COMP', tick_type='TRD',
    symbols=['AAPL', 'MSFT', 'GOOGL'],
    presort=False,
)

batch_size and concurrency tune presort performance. batch_size controls how many symbols are processed per batch, and concurrency sets the number of CPU cores:

data = otp.DataSource(
    db='US_COMP', tick_type='TRD',
    symbols=large_symbol_list,  # e.g., 500+ symbols
    batch_size=50,              # process 50 symbols at a time
    concurrency=4,              # use 4 CPU cores
)

symbol_date specifies the date for resolving symbols in date-dependent symbologies. It is only applicable when symbols is set:

data = otp.DataSource(
    db='US_COMP', tick_type='TRD',
    symbols=['AAPL', 'MSFT'],
    symbol_date=otp.dt(2022, 3, 1),
)

# symbol_date also accepts integers in YYYYMMDD format
data = otp.DataSource(
    db='US_COMP', tick_type='TRD',
    symbols=['AAPL', 'MSFT'],
    symbol_date=20220301,
)