Skip to content

Run End To End Mock2 System

Overview

The goal is to run a System with Portfolio in the replayed time mode for a few bars.

The System is /dataflow_amp/system/mock2/mock2_forecast_system.py

The system is composed by the following components

flowchart LR
    MarketData --> PredictionDag
    subgraph Dag
      PredictionDag --> ProcessForecasts
    end
    subgraph Portfolio
        DataFramePortfolio --> DataFrameBroker
    end
    ProcessForecasts --> DataFramePortfolio
    DataFrameBroker --> Trades

High-level architecture and code organization

  • dataflow_amp/pipelines/mock2/mock2_pipeline.py
  • Builder that creates the prediction model
  • Mock2_DagBuilder

  • dataflow_amp/system/mock2/mock2_forecast_system.py

  • Build the full-system
  • Mock2_NonTime_ForecastSystem
  • Mock2_Time_ForecastSystem_with_DataFramePortfolio

  • dataflow_amp/system/mock2/mock2_forecast_system_example.py

  • Builder for the System used in backtesting and unit testing
  • get_Mock2_NonTime_ForecastSystem_example1()
  • get_Mock2_NonTime_ForecastSystem_example2()

  • dataflow_amp/system/mock2/mock2_tile_config_builders.py

  • Build tile configs for simulation
  • build_Mock2_tile_config_list()
  • build_Mock2_tile_config_list_for_unit_test()

  • dataflow_amp/system/mock2/test/test_mock2_forecast_system.py

  • Unit tests for Mock2_NonTime_ForecastSystem
  • Test_Mock2_System_CheckConfig
  • Test_Mock2_NonTime_ForecastSystem_FitPredict
  • ...

  • dataflow_amp/system/mock2/test/test_mock2_tiledbacktest.py

  • Run an end-to-end backtest for a Mock2 pipeline and analysis flow
  • Test_Mock2_NonTime_ForecastSystem_TiledBacktest

  • dataflow_amp/system/mock2/scripts/run_end_to_end_Mock2_system.py

  • Run an end-to-end streaming simulation

System Configuration

System parameters are controlled via SystemConfig, which is built dataflow_amp/system/mock2/scripts/run_end_to_end_Mock2_system.py.

The snippet of code below configures the input data, e.g., bar duration, history amount, number of assets

## Bar duration in seconds, e.g., 60 * 60 is 1 hour bar.
system.config["bar_duration_in_seconds"] = 60 * 60
system.config["market_data_config", "number_of_assets"] = 10
## History amount, e.g., 10 days worth of data.
system.config["market_data_config", "history_lookback"] = pd.Timedelta(
    days=10
)

The snippet of code below configures the maximum portfolio notional:

system.config[
    "process_forecasts_node_dict",
    "process_forecasts_dict",
    "optimizer_config",
    "params",
    "kwargs",
] = cconfig.Config.from_dict({"target_gmv": 1e7})

The snippet of code below configures for how long to run the System:

system.config["dag_runner_config", "rt_timeout_in_secs_or_time"] = (
    system.config["bar_duration_in_seconds"] * 2
)

The script saves the results to /app/system_log_dir together with the configuration file /app/system_log_dir/system_config.output.txt.

For the demo purposes only a few most important parameters are exposed while there are more parameters to control. The full SystemConfig looks like:

dag_config (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
  filter_weekends (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
    in_col_groups (marked_as_used=False, writer=None, val_type=list): [('close',), ('high',), ('low',), ('open',), ('volume',)]
    out_col_group (marked_as_used=False, writer=None, val_type=tuple): ()
    join_output_with_input (marked_as_used=False, writer=None, val_type=bool): False
  filter_ath (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
    in_col_groups (marked_as_used=False, writer=None, val_type=list): [('close',), ('high',), ('low',), ('open',), ('volume',)]
    out_col_group (marked_as_used=False, writer=None, val_type=tuple): ()
    transformer_kwargs (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
      start_time (marked_as_used=False, writer=None, val_type=datetime.time): 09:30:00
      end_time (marked_as_used=False, writer=None, val_type=datetime.time): 16:00:00
    join_output_with_input (marked_as_used=False, writer=None, val_type=bool): False
  resample (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
    in_col_groups (marked_as_used=False, writer=None, val_type=list): [('open',), ('high',), ('low',), ('close',), ('volume',)]
    out_col_group (marked_as_used=False, writer=None, val_type=tuple): ()
    transformer_kwargs (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
      rule (marked_as_used=False, writer=None, val_type=str): 60T
      resampling_groups (marked_as_used=False, writer=None, val_type=list): [({'close': 'close'}, 'last', {}), ({'high': 'high'}, 'max', {}), ({'low': 'low'}, 'min', {}), ({'open': 'open'}, 'first', {}), ({'volume': 'volume'}, 'sum', {'min_count': 1}), ({'close': 'twap'}, 'mean', {})]
      vwap_groups (marked_as_used=False, writer=None, val_type=list): [('close', 'volume', 'vwap')]
    reindex_like_input (marked_as_used=False, writer=None, val_type=bool): False
    join_output_with_input (marked_as_used=False, writer=None, val_type=bool): False
  compute_ret_0 (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
    in_col_groups (marked_as_used=False, writer=None, val_type=list): [('close',), ('vwap',), ('twap',)]
    out_col_group (marked_as_used=False, writer=None, val_type=tuple): ()
    transformer_kwargs (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
      mode (marked_as_used=False, writer=None, val_type=str): pct_change
    col_mapping (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
      close (marked_as_used=False, writer=None, val_type=str): close.ret_0
      vwap (marked_as_used=False, writer=None, val_type=str): vwap.ret_0
      twap (marked_as_used=False, writer=None, val_type=str): twap.ret_0
  compute_vol (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
    in_col_groups (marked_as_used=False, writer=None, val_type=list): [('vwap.ret_0',)]
    out_col_group (marked_as_used=False, writer=None, val_type=tuple): ()
    transformer_kwargs (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
      tau (marked_as_used=False, writer=None, val_type=int): 32
    col_mapping (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
      vwap.ret_0 (marked_as_used=False, writer=None, val_type=str): vwap.ret_0.vol
  adjust_rets (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
    in_col_groups (marked_as_used=False, writer=None, val_type=list): [('vwap.ret_0',), ('vwap.ret_0.vol',)]
    out_col_group (marked_as_used=False, writer=None, val_type=tuple): ()
    transformer_kwargs (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
      term1_col (marked_as_used=False, writer=None, val_type=str): vwap.ret_0
      term2_col (marked_as_used=False, writer=None, val_type=str): vwap.ret_0.vol
      out_col (marked_as_used=False, writer=None, val_type=str): vwap.ret_0.vol_adj
      term2_delay (marked_as_used=False, writer=None, val_type=int): 2
      operation (marked_as_used=False, writer=None, val_type=str): div
    drop_nans (marked_as_used=False, writer=None, val_type=bool): True
  clip (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
    in_col_groups (marked_as_used=False, writer=None, val_type=list): [('vwap.ret_0.vol_adj',)]
    out_col_group (marked_as_used=False, writer=None, val_type=tuple): ()
    col_mapping (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
        vwap.ret_0.vol_adj (marked_as_used=False, writer=None, val_type=str): vwap.ret_0.vol_adj.c
dag_builder_object (marked_as_used=True, writer=/app/amp/dataflow/system/system_builder_utils.py::491::apply_dag_property, val_type=dataflow_amp.pipelines.mock2.mock2_pipeline.Mock2_DagBuilder): nid_prefix=
dag_builder_class (marked_as_used=False, writer=None, val_type=str): Mock2_DagBuilder
system_class (marked_as_used=False, writer=None, val_type=str): Mock2_Time_ForecastSystem_with_DataFramePortfolio
system_log_dir (marked_as_used=False, writer=None, val_type=str): ./system_log_dir
market_data_config (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
  system_log_dir (marked_as_used=False, writer=None, val_type=str): /app/system_log_dir
  bar_duration_in_seconds (marked_as_used=False, writer=None, val_type=int): 3600
  number_of_assets (marked_as_used=False, writer=None, val_type=int): 10
  history_lookback (marked_as_used=False, writer=None, val_type=pandas._libs.tslibs.timedeltas.Timedelta): 10 days 00:00:00
  replayed_delay_in_mins_or_timestamp (marked_as_used=False, writer=None, val_type=pandas._libs.tslibs.timestamps.Timestamp): 2023-08-15 10:30:00-04:00
  asset_ids (marked_as_used=True, writer=/app/amp/dataflow/system/system_builder_utils.py::707::get_DataFramePortfolio_from_System, val_type=list): [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
  data (marked_as_used=True, writer=/app/amp/dataflow/system/system_builder_utils.py::329::get_ReplayedMarketData_from_df, val_type=pandas.core.frame.DataFrame):
    index=[0, 7209]
    columns=start_datetime,end_datetime,timestamp_db,open,high,low,close,volume,asset_id
    shape=(7210, 9)
                 start_datetime              end_datetime              timestamp_db     open     high      low    close  volume  asset_id
    0 2023-08-01 09:00:00-04:00 2023-08-01 10:00:00-04:00 2023-08-01 10:00:10-04:00   999.89   999.89   999.89   999.89    13.0         0
    1 2023-08-01 09:00:00-04:00 2023-08-01 10:00:00-04:00 2023-08-01 10:00:10-04:00  1000.00  1000.00  1000.00  1000.00    14.0         1
    2 2023-08-01 09:00:00-04:00 2023-08-01 10:00:00-04:00 2023-08-01 10:00:10-04:00  1000.00  1000.00  1000.00  1000.00    12.0         2
    ...
    7207 2023-08-31 09:00:00-04:00 2023-08-31 10:00:00-04:00 2023-08-31 10:00:10-04:00  1002.39  1003.28  1002.39  1003.28   477.0         7
    7208 2023-08-31 09:00:00-04:00 2023-08-31 10:00:00-04:00 2023-08-31 10:00:10-04:00   986.35   986.35   985.84   986.15   466.0         8
    7209 2023-08-31 09:00:00-04:00 2023-08-31 10:00:00-04:00 2023-08-31 10:00:10-04:00   997.93   998.48   997.69   997.84   493.0         9
  delay_in_secs (marked_as_used=False, writer=None, val_type=int): 10
portfolio_config (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
  mark_to_market_col (marked_as_used=True, writer=/app/amp/dataflow/system/system_builder_utils.py::713::get_DataFramePortfolio_from_System, val_type=str): close
  pricing_method (marked_as_used=True, writer=/app/amp/dataflow/system/system_builder_utils.py::716::get_DataFramePortfolio_from_System, val_type=str): twap.60T
  column_remap (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
    bid (marked_as_used=True, writer=/app/amp/dataflow/system/system_builder_utils.py::704::get_DataFramePortfolio_from_System, val_type=str): bid
    ask (marked_as_used=True, writer=/app/amp/dataflow/system/system_builder_utils.py::704::get_DataFramePortfolio_from_System, val_type=str): ask
    midpoint (marked_as_used=True, writer=/app/amp/dataflow/system/system_builder_utils.py::704::get_DataFramePortfolio_from_System, val_type=str): midpoint
    price (marked_as_used=True, writer=/app/amp/dataflow/system/system_builder_utils.py::704::get_DataFramePortfolio_from_System, val_type=str): close
process_forecasts_node_dict (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
  prediction_col (marked_as_used=False, writer=None, val_type=str): vwap.ret_0.vol_adj.c
  volatility_col (marked_as_used=False, writer=None, val_type=str): vwap.ret_0.vol
  spread_col (marked_as_used=False, writer=None, val_type=NoneType): None
  portfolio (marked_as_used=False, writer=None, val_type=oms.portfolio.dataframe_portfolio.DataFramePortfolio):
    <oms.portfolio.dataframe_portfolio.DataFramePortfolio at 0x7fbc7502f340>
      # holdings_shares=
      Empty DataFrame
      Columns: []
      Index: []
      # holdings_notional=
      Empty DataFrame
      Columns: []
      Index: []
      # executed_trades_shares=
      Empty DataFrame
      Columns: []
      Index: []
      # executed_trades_notional=
      Empty DataFrame
      Columns: []
      Index: []
      # pnl=
      Empty DataFrame
      Columns: []
      Index: []
      # statistics=
      Empty DataFrame
      Columns: [pnl]
      Index: []
  process_forecasts_dict (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
    execution_mode (marked_as_used=False, writer=None, val_type=str): real_time
    log_dir (marked_as_used=False, writer=None, val_type=str): ./system_log_dir/process_forecasts
    order_config (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
      order_type (marked_as_used=False, writer=None, val_type=str): price@twap
      passivity_factor (marked_as_used=False, writer=None, val_type=NoneType): None
      order_duration_in_mins (marked_as_used=False, writer=None, val_type=int): 60
    optimizer_config (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
      backend (marked_as_used=False, writer=None, val_type=str): pomo
      params (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
        style (marked_as_used=False, writer=None, val_type=str): cross_sectional
        kwargs (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
          bulk_frac_to_remove (marked_as_used=False, writer=None, val_type=float): 0.0
          bulk_fill_method (marked_as_used=False, writer=None, val_type=str): zero
          target_gmv (marked_as_used=False, writer=None, val_type=float): 10000000.0
    ath_start_time (marked_as_used=False, writer=None, val_type=datetime.time): 09:30:00
    trading_start_time (marked_as_used=False, writer=None, val_type=datetime.time): 09:30:00
    ath_end_time (marked_as_used=False, writer=None, val_type=datetime.time): 16:00:00
    trading_end_time (marked_as_used=False, writer=None, val_type=datetime.time): 15:00:00
    liquidate_at_trading_end_time (marked_as_used=False, writer=None, val_type=bool): False
dag_runner_config (marked_as_used=False, writer=None, val_type=core.config.config_.Config):
  bar_duration_in_secs (marked_as_used=True, writer=/app/amp/dataflow/system/system_builder_utils.py::1108::get_RealTimeDagRunner_from_System, val_type=int): 3600
  rt_timeout_in_secs_or_time (marked_as_used=True, writer=/app/amp/dataflow/system/system_builder_utils.py::1111::get_RealTimeDagRunner_from_System, val_type=int): 7200
event_loop_object (marked_as_used=True, writer=/app/amp/dataflow/system/system_builder_utils.py::711::get_DataFramePortfolio_from_System, val_type=helpers.hasyncio._EventLoop): <_EventLoop running=False closed=False debug=False>

System components

MarketData

The source of data is ReplayedMarketData, an object that can replay a synthetic or previously capture dataframe. The data is represented by random OHLCV bars for N assets, K days of history and D bar duration in seconds, where N, K, D are configurable parameters that we control using SystemConfig.

Data snippet:

                                     start_datetime              timestamp_db     open     high      low    close  volume  asset_id

end_datetime 2023-08-08 15:00:00-04:00 2023-08-08 14:00:00-04:00 2023-08-08
15:00:10-04:00 985.34 986.36 984.87 986.25 935.0 0 2023-08-08 15:00:00-04:00
2023-08-08 14:00:00-04:00 2023-08-08 15:00:10-04:00 1005.38 1006.65 1005.07
1006.34 948.0 1 2023-08-08 15:00:00-04:00 2023-08-08 14:00:00-04:00 2023-08-08
15:00:10-04:00 1002.44 1002.49 1001.60 1001.88 1013.0 2

We also control via SystemConfig:

  • When to start replaying the data, e.g., the dataframe starts at 2023-08-01 10:00:00 but we start computing at 2023-08-15 10:30:00 so that there is enough history to warm up the system
  • Delay in seconds, i.e. the System simulates delay and waits for data to become available for X seconds

DagBuilder

A DagBuilder configures Nodes and connects them into a Dag in order to generate forecasts. The DagBuilder used in this example is the toy model Mock2_DagBuilder in /dataflow_amp/pipelines/mock2/mock2_pipeline.py

Mock2_DagBuilder is a pipeline that:

  • Filters out weekends
  • Removes rows outside active trading hours
  • Resamples the data to the desired frequency
  • Computes returns
  • Computes volatility
  • Adjusts returns using volatility
  • Clips returns

DAG

Dag is represented by:

  • RealTimeDataSource node that contains ReplayedMarketData and introduces real-time data behavior, e.g., the System waits for X seconds for the data
  • Nodes that are described by the Mock2_DagBuilder
  • ProcessForecastsNode that controls the Portfolio configuration

One can configure portfolio construction (e.g, maximum portfolio notional, i.e. target_gmv) via SystemConfig.

DagRunner

DagRunner is represented by RealTimeDagRunner which is an executor that controls how to run the System in streaming mode (both real-time and simulated).

One can configure any parameter (e.g., for how long to run the System, e.g., for 2 bars) via SystemConfig.

Portfolio

Portfolio is implemented by DataFramePortfolio with a DataFrameBroker which:

  • Keeps track of holdings using a DataFrame (instead of a database)
  • Has no advanced mechanism to control trade execution, i.e. all orders always are fully filled

System run

To run the System and save logs execute the following cmd:

docker> ./dataflow_amp/system/mock2/scripts/run_end_to_end_Mock2_system.py 2>&1 | tee tmp.log_system.txt

The System starts at 2023-08-15 11:00:00-04:00 and computes the DAG for 2 bars.

## Real-time loop: num_it=1: rt_time_out_in_secs=7200 wall_clock_time='2023-08-15 11:00:00-04:00' real_wall_clock_time='2024-01-05 08:39:32.981505-05:00'

It waits for the data to become available for 10 seconds (configurable):

#### waiting on last bar: num_iter=10/120: current_bar_timestamp=2023-08-15 11:00:00-04:00 wall_clock_time=2023-08-15 11:00:10-04:00 last_db_end_time=2023-08-15 11:00:00-04:00

08:39:33 rss=0.292GB vms=1.261GB mem_pct=1% Task-3 hprint.py log_frame:604

## Waiting on last bar: done

And once the data is ready the System computes the Dag:

#################################################################################
Executing method 'predict' for node topological_id=0 nid='read_data' ...
#################################################################################
...
#################################################################################
Executing method 'predict' for node topological_id=8 nid='process_forecasts' ...
#################################################################################

When executing the ProcessForecastsNode the System:

  • Computes target positions
  • Generates orders
  • Submits orders (in this case all orders are fully filled)
## last target positions=

          holdings_shares    price  holdings_notional      wall_clock_timestamp  prediction  volatility  spread  target_holdings_notional  target_trades_notional  target_trades_shares  target_holdings_shares

asset_id 0 0 983.66 0 2023-08-15 11:00:11-04:00 0.9674 0.000967 0 586609.98842
586609.98842 596.35442 596.35442 1 0 1010.59 0 2023-08-15 11:00:11-04:00
0.772048 0.001099 0 454185.047454 454185.047454 449.42563 449.42563 2 0 1005.54
0 2023-08-15 11:00:11-04:00 -0.607692 0.000675 0 -1202955.27038 -1202955.27038
-1196.327615 -1196.327615 ... 7 0 1014.55 0 2023-08-15 11:00:11-04:00 -0.632565
0.000568 0 -1699546.60519 -1699546.60519 -1675.17284 -1675.17284 8 0 996.88 0
2023-08-15 11:00:11-04:00 -1.287815 0.000904 0 -670108.728394 -670108.728394
-672.206011 -672.206011 9 0 993.89 0 2023-08-15 11:00:11-04:00 -0.02503 0.00066
0 1259035.977478 1259035.977478 1266.775979 1266.775979 ...

## last orders=

Order: order*id=0 creation_timestamp=2023-08-15 11:00:11-04:00 asset_id=0
type*=price@twap start*timestamp=2023-08-15 11:00:11-04:00
end_timestamp=2023-08-15 12:00:00-04:00 curr_num_shares=0.0
diff_num_shares=596.354419637 tz=America/New_York extra_params={} Order:
order_id=1 creation_timestamp=2023-08-15 11:00:11-04:00 asset_id=1
type*=price@twap start*timestamp=2023-08-15 11:00:11-04:00
end_timestamp=2023-08-15 12:00:00-04:00 curr_num_shares=0.0
diff_num_shares=449.425630032 tz=America/New_York extra_params={} Order:
order_id=2 creation_timestamp=2023-08-15 11:00:11-04:00 asset_id=2
type*=price@twap start*timestamp=2023-08-15 11:00:11-04:00
end_timestamp=2023-08-15 12:00:00-04:00 curr_num_shares=0.0
diff_num_shares=-1196.327615391 tz=America/New_York extra_params={} Order:
order_id=3 creation_timestamp=2023-08-15 11:00:11-04:00 asset_id=3
type*=price@twap start*timestamp=2023-08-15 11:00:11-04:00
end_timestamp=2023-08-15 12:00:00-04:00 curr_num_shares=0.0
diff_num_shares=-686.685193608 tz=America/New_York extra_params={} Order:
order_id=4 creation_timestamp=2023-08-15 11:00:11-04:00 asset_id=4
type*=price@twap start*timestamp=2023-08-15 11:00:11-04:00
end_timestamp=2023-08-15 12:00:00-04:00 curr_num_shares=0.0
diff_num_shares=386.275392292 tz=America/New_York extra_params={} Order:
order_id=5 creation_timestamp=2023-08-15 11:00:11-04:00 asset_id=5
type*=price@twap start*timestamp=2023-08-15 11:00:11-04:00
end_timestamp=2023-08-15 12:00:00-04:00 curr_num_shares=0.0
diff_num_shares=1960.903428683 tz=America/New_York extra_params={} Order:
order_id=6 creation_timestamp=2023-08-15 11:00:11-04:00 asset_id=6
type*=price@twap start*timestamp=2023-08-15 11:00:11-04:00
end_timestamp=2023-08-15 12:00:00-04:00 curr_num_shares=0.0
diff_num_shares=-1086.072750644 tz=America/New_York extra_params={} Order:
order_id=7 creation_timestamp=2023-08-15 11:00:11-04:00 asset_id=7
type*=price@twap start*timestamp=2023-08-15 11:00:11-04:00
end_timestamp=2023-08-15 12:00:00-04:00 curr_num_shares=0.0
diff_num_shares=-1675.172840363 tz=America/New_York extra_params={} Order:
order_id=8 creation_timestamp=2023-08-15 11:00:11-04:00 asset_id=8
type*=price@twap start*timestamp=2023-08-15 11:00:11-04:00
end_timestamp=2023-08-15 12:00:00-04:00 curr_num_shares=0.0
diff_num_shares=-672.206011149 tz=America/New_York extra_params={} Order:
order_id=9 creation_timestamp=2023-08-15 11:00:11-04:00 asset_id=9
type*=price@twap start_timestamp=2023-08-15 11:00:11-04:00
end_timestamp=2023-08-15 12:00:00-04:00 curr_num_shares=0.0
diff_num_shares=1266.775978708 tz=America/New_York extra_params={}

Then the System goes to sleep waiting for the next bar to start:

08:39:37 rss=0.296GB vms=1.325GB mem*pct=1%% Task-3 process_forecasts*.py
process_forecasts:353 Event: exiting process_forecasts() for loop. 08:39:37
rss=0.296GB vms=1.325GB mem_pct=1% Task-3 real_time_dag_runner.py \_run_dag:264
Waiting on node 'process_forecasts': done 08:39:37 rss=0.296GB vms=1.325GB
mem_pct=1% Task-1 real_time.py execute_with_real_time_loop:422 await done
(wall_clock_time=2023-08-15 12:00:00-04:00)

Since the clock is simulated, instead of waiting 1 hour, the System moves the clock forward within a few seconds and starts to compute the 2nd bar:

## Real-time loop: num_it=2: rt_time_out_in_secs=7200 wall_clock_time='2023-08-15 12:00:00-04:00' real_wall_clock_time='2024-01-05 08:39:37.416849-05:00'

Then the System repeats the Dag computation and order submission but for the next bar and exits once the termination condition becomes True (run for 2 bars in this case):

08:39:42 rss=0.297GB vms=1.326GB mem_pct=1% Task-1 hwall_clock_time.py
set_current_bar_timestamp:105 timestamp=2023-08-15 13:00:00-04:00 08:39:42
rss=0.297GB vms=1.326GB mem_pct=1%% Task-1 real_time.py
execute_with_real_time_loop:433 rt_timeout_in_secs_or_time=7200,
bar_duration_in_secs=3600, num_it=2, num_iterations=2, is_done=True 08:39:42
rss=0.297GB vms=1.326GB mem_pct=1% - ^[[36mINFO ^[[0m Task-1 real_time.py
execute_with_real_time_loop:443 Exiting loop: num_it=2, num_iterations=2

The output is in the dir:

> tree.sh -p system_log_dir
system_log_dir/
|-- process_forecasts/
|   |-- orders/
|   |   |-- 20230815_110000.20230815_110011.csv
|   |   `-- 20230815_120000.20230815_120011.csv
|   |-- portfolio/
|   |   |-- executed_trades_notional/
|   |   |   |-- 20230815_110000.20230815_110011.csv
|   |   |   `-- 20230815_120000.20230815_120011.csv
|   |   |-- executed_trades_shares/
|   |   |   |-- 20230815_110000.20230815_110011.csv
|   |   |   `-- 20230815_120000.20230815_120011.csv
|   |   |-- holdings_notional/
|   |   |   |-- 20230815_110000.20230815_110011.csv
|   |   |   `-- 20230815_120000.20230815_120011.csv
|   |   |-- holdings_shares/
|   |   |   |-- 20230815_110000.20230815_110011.csv
|   |   |   `-- 20230815_120000.20230815_120011.csv
|   |   `-- statistics/
|   |       |-- 20230815_110000.20230815_110011.csv
|   |       `-- 20230815_120000.20230815_120011.csv
|   `-- target_positions/
|       |-- 20230815_110000.20230815_110011.csv
|       `-- 20230815_120000.20230815_120011.csv
|-- system_config.input.txt
|-- system_config.input.values_as_strings.pkl
|-- system_config.output.txt
`-- system_config.output.values_as_strings.pkl

10 directories, 18 files

TODO(gp): @all Describe the output