Ray usage examples#

Example remote function#

import ray
import onetick.py as otp

# Special decorator to run code remotely
@ray.remote(max_retries=1)
def example_otp_code():

    # here goes OTP code you want to run
    data = otp.DataSource(db='NYSE_TAQ',
                          tick_type='TRD',
                          start=otp.dt(2022, 4, 1),
                          end=otp.dt(2022, 4, 2))

    data['VOLUME'] = data['PRICE'] * data['SIZE']
    return data.to_df()

# Initialize Ray connection
ray.init()

# Run your code on Ray and get results back
df = ray.get(example_otp_code.remote())

# Shutdown Ray connection
ray.shutdown()

# Continue using df just as local pandas.DataFrame object
print(df.head())
                            Time EXCHANGE  COND STOP_STOCK SOURCE TRF TTE TICKER   PRICE        DELETED_TIME  TICK_STATUS  SIZE  CORR  SEQ_NUM TRADE_ID           PARTICIPANT_TIME            TRF_TIME  OMDSEQ   VOLUME
0 2022-04-01 04:00:00.018381502        K  @ TI                 N       0   AAPL  175.00 1969-12-31 19:00:00            0     1     0     1970        1 2022-04-01 04:00:00.000186 1969-12-31 19:00:00       0   175.00
1 2022-04-01 04:00:00.018693590        K  @ TI                 N       0   AAPL  175.00 1969-12-31 19:00:00            0     3     0     1971        2 2022-04-01 04:00:00.000186 1969-12-31 19:00:00       1   525.00
2 2022-04-01 04:00:00.018702708        K  @ TI                 N       0   AAPL  175.01 1969-12-31 19:00:00            0     3     0     1972        3 2022-04-01 04:00:00.000186 1969-12-31 19:00:00       2   525.03
3 2022-04-01 04:00:00.018876909        K  @ TI                 N       0   AAPL  175.03 1969-12-31 19:00:00            0     1     0     1973        4 2022-04-01 04:00:00.000186 1969-12-31 19:00:00       3   175.03
4 2022-04-01 04:00:00.059225208        K  @FTI                 N       1   AAPL  175.08 1969-12-31 19:00:00            0    49     0     2024        5 2022-04-01 04:00:00.058673 1969-12-31 19:00:00       0  8578.92

Example function with arguments#

You may define arguments for remote functions and call it similarly with specific arguments. The only difference is that you must put arguments inside function.remote() method.

# Remote function with arguments
@ray.remote(max_retries=1)
def get_BBO_offset(start, num_orders, offset):
    # Create order flow.
    # In practice, it can be take from a CSV file for from a DataFrame.
    order = otp.Ticks(timezone_for_time='EST5EDT',
                      start=start,
                      end=start + otp.Hour(1),
                      offset = [otp.Milli(x * 500) for x in range(0, num_orders)],
                      ID = [x for x in range (0, num_orders)])
    order['ARRIVAL'] = order['Time']
    order['SYMBOL'] = 'NQ\H22'
    order()
    q = order.join_with_query(
        otp.DataSource('CME', tick_type='QTE', back_to_first_tick=600),
        symbol=(order['SYMBOL']),
        start_time=order['ARRIVAL'] + otp.Milli(int(offset * 1000)),
        end_time=order['ARRIVAL'] + otp.Milli(int(offset * 1000)),
    )
    return q.to_df()

# Initialize Ray connection
ray.init()

# Call remote function with specific arguments
df = ray.get(get_BBO_offset.remote(start=otp.dt(2022, 3, 2, 10), num_orders=5, offset=.5))
print(df.head())

# Call it again with other arguments
df_other_arguments = ray.get(get_BBO_offset.remote(start=otp.dt(2022, 3, 2, 10), num_orders=10, offset=-2))
print(df_other_arguments.head())

# Shutdown Ray connection
ray.shutdown()
                    Time  ID                 ARRIVAL  SYMBOL  BID_PRICE  BID_SIZE  BID_NUM_ORDERS  BID_SIZE_IMPLIED  ASK_PRICE  ASK_SIZE  ASK_NUM_ORDERS  ASK_SIZE_IMPLIED  OMDSEQ
0 2022-03-02 10:00:00.000   0 2022-03-02 10:00:00.000  NQ\H22   14076.75         3               3                 0   14077.75         1               1                 0       1
1 2022-03-02 10:00:00.500   1 2022-03-02 10:00:00.500  NQ\H22   14084.00         1               1                 0   14084.75         1               1                 0       4
2 2022-03-02 10:00:01.000   2 2022-03-02 10:00:01.000  NQ\H22   14083.75         2               2                 0   14084.75         1               1                 0       4
3 2022-03-02 10:00:01.500   3 2022-03-02 10:00:01.500  NQ\H22   14080.25         4               3                 0   14081.25         3               2                 0       1
4 2022-03-02 10:00:02.000   4 2022-03-02 10:00:02.000  NQ\H22   14078.25         1               1                 0   14079.00         3               3                 0       1
                    Time  ID                 ARRIVAL  SYMBOL  BID_PRICE  BID_SIZE  BID_NUM_ORDERS  BID_SIZE_IMPLIED  ASK_PRICE  ASK_SIZE  ASK_NUM_ORDERS  ASK_SIZE_IMPLIED  OMDSEQ
0 2022-03-02 10:00:00.000   0 2022-03-02 10:00:00.000  NQ\H22   14079.25         1               1                 0   14080.00         2               2                 0      10
1 2022-03-02 10:00:00.500   1 2022-03-02 10:00:00.500  NQ\H22   14079.50         1               1                 0   14080.25         1               1                 0       7
2 2022-03-02 10:00:01.000   2 2022-03-02 10:00:01.000  NQ\H22   14080.00         1               1                 0   14080.75         2               2                 0       1
3 2022-03-02 10:00:01.500   3 2022-03-02 10:00:01.500  NQ\H22   14073.25         1               1                 0   14074.00         1               1                 0       1
4 2022-03-02 10:00:02.000   4 2022-03-02 10:00:02.000  NQ\H22   14075.25         1               1                 0   14076.00         1               1                 0       4

Limitations#

Remote run approach leads to some usage limitations:

  • You cannot use custom/imported modules inside remote functions - compute all arguments before calling remote function.

  • Ray instance is isolated from global Internet.

  • Run only onetick.py specific code to reduce Ray instance resource (memory, CPU) consumption.

  • You cannot use file pointers as arguments - call remote functions with file content as argument.

Using apply() method in remote context#

Technical implementation of otp.Source.apply method requires user to use otp.remote decorator with functions and lambda expressions that will be used as arguments to otp.Source.apply method.

import ray
import onetick.py as otp

@otp.remote
def match_condition(row):
   if row['COND'].str.contains('O'):
       return 1
   if row['COND'].str.contains('6') == True:
       return 1
   if row['COND'].str.contains('9') == True:
       return 1
   else:
       return 0

@ray.remote(max_retries=1)
def quicktest(start, end, symbol):
    ds_trd = otp.DataSource(db='NYSE_TAQ', tick_type='TRD', start=start, end=end)
    ds_trd.schema['COND'] = str
    ds_trd['OC_TRD'] = ds_trd.apply(match_condition)
    return ds_trd.to_df(symbol=[symbol])

start = otp.dt(2022, 8, 25, 9, 29)
end = otp.dt(2022, 8, 25, 16, 30)
symbol = 'BAC'
ray.init()
result = ray.get(quicktest.remote(start, end, symbol))
print(result)
ray.shutdown()