diff --git a/examples/train.py b/examples/train.py index f889ac9b6..b669dc59f 100644 --- a/examples/train.py +++ b/examples/train.py @@ -46,6 +46,10 @@ def parse_args(args): parser.add_argument( '--rl_trainer', type=str, default="rllib", help='the RL trainer to use. either rllib or Stable-Baselines') + parser.add_argument( + '--load_weights_path', type=str, default=None, + help='Path to h5 file containing a pretrained model. Relevent for PPO with RLLib' + ) parser.add_argument( '--algorithm', type=str, default="PPO", help='RL algorithm to use. Options are PPO, TD3, and CENTRALIZEDPPO (which uses a centralized value function)' @@ -177,11 +181,15 @@ def setup_exps_rllib(flow_params, alg_run = flags.algorithm.upper() if alg_run == "PPO": - from flow.algorithms.custom_ppo import CustomPPOTrainer + from flow.algorithms.imitation_learning.custom_ppo import CustomPPOTrainer from ray.rllib.agents.ppo import DEFAULT_CONFIG - alg_run = CustomPPOTrainer config = deepcopy(DEFAULT_CONFIG) + + alg_run = CustomPPOTrainer + + horizon = flow_params['env'].horizon + config["num_workers"] = n_cpus config["horizon"] = horizon config["model"].update({"fcnet_hiddens": [32, 32]}) @@ -195,6 +203,21 @@ def setup_exps_rllib(flow_params, if flags.grid_search: config["lambda"] = tune.grid_search([0.5, 0.9]) config["lr"] = tune.grid_search([5e-4, 5e-5]) + + if flags.load_weights_path: + from flow.algorithms.imitation_learning.ppo_model import PPONetwork + from flow.algorithms.imitation_learning.custom_trainable import Imitation_PPO_Trainable + from ray.rllib.models import ModelCatalog + + # Register custom model + ModelCatalog.register_custom_model("PPO_loaded_weights", PPONetwork) + # set model to the custom model for run + config['model']['custom_model'] = "PPO_loaded_weights" + config['model']['custom_options'] = {"h5_load_path": flags.load_weights_path} + config['observation_filter'] = 'NoFilter' + # alg run is the Trainable class + alg_run = Imitation_PPO_Trainable + elif alg_run == "CENTRALIZEDPPO": from flow.algorithms.centralized_PPO import CCTrainer, CentralizedCriticModel from ray.rllib.agents.ppo import DEFAULT_CONFIG @@ -327,7 +350,6 @@ def on_train_result(info): register_env(gym_name, create_env) return alg_run, gym_name, config - def train_rllib(submodule, flags): """Train policies using the PPO algorithm in RLlib.""" import ray @@ -356,6 +378,7 @@ def trial_str_creator(trial): ray.init(local_mode=True) else: ray.init() + exp_dict = { "run_or_experiment": alg_run, "name": flags.exp_title or flow_params['exp_tag'], diff --git a/flow/algorithms/__init__.py b/flow/algorithms/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/flow/algorithms/imitation_learning/__init__.py b/flow/algorithms/imitation_learning/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/flow/algorithms/imitation_learning/custom_ppo.py b/flow/algorithms/imitation_learning/custom_ppo.py new file mode 100644 index 000000000..c7e81e13c --- /dev/null +++ b/flow/algorithms/imitation_learning/custom_ppo.py @@ -0,0 +1,230 @@ +""" +Copied from RLLib's PPO, but uses CustomPPOTFPolicy, which tracks value function predictions in Tensorboard. +""" + +import logging +import os + +from ray.rllib.agents import with_common_config +from ray.tune.trial import ExportFormat + +from flow.algorithms.imitation_learning.custom_ppo_tf_policy import CustomPPOTFPolicy +from ray.rllib.agents.trainer_template import build_trainer +from ray.rllib.optimizers import SyncSamplesOptimizer, LocalMultiGPUOptimizer +from ray.rllib.utils import try_import_tf + +tf = try_import_tf() + +logger = logging.getLogger(__name__) + +# yapf: disable +# __sphinx_doc_begin__ +DEFAULT_CONFIG = with_common_config({ + # Should use a critic as a baseline (otherwise don't use value baseline; + # required for using GAE). + "use_critic": True, + # If true, use the Generalized Advantage Estimator (GAE) + # with a value function, see https://arxiv.org/pdf/1506.02438.pdf. + "use_gae": True, + # The GAE(lambda) parameter. + "lambda": 1.0, + # Initial coefficient for KL divergence. + "kl_coeff": 0.2, + # Size of batches collected from each worker. + "rollout_fragment_length": 200, + # Number of timesteps collected for each SGD round. This defines the size + # of each SGD epoch. + "train_batch_size": 4000, + # Total SGD batch size across all devices for SGD. This defines the + # minibatch size within each epoch. + "sgd_minibatch_size": 128, + # Whether to shuffle sequences in the batch when training (recommended). + "shuffle_sequences": True, + # Number of SGD iterations in each outer loop (i.e., number of epochs to + # execute per train batch). + "num_sgd_iter": 30, + # Stepsize of SGD. + "lr": 5e-5, + # Learning rate schedule. + "lr_schedule": None, + # Share layers for value function. If you set this to True, it's important + # to tune vf_loss_coeff. + "vf_share_layers": False, + # Coefficient of the value function loss. IMPORTANT: you must tune this if + # you set vf_share_layers: True. + "vf_loss_coeff": 1.0, + # Coefficient of the entropy regularizer. + "entropy_coeff": 0.0, + # Decay schedule for the entropy regularizer. + "entropy_coeff_schedule": None, + # PPO clip parameter. + "clip_param": 0.3, + # Clip param for the value function. Note that this is sensitive to the + # scale of the rewards. If your expected V is large, increase this. + "vf_clip_param": 10.0, + # If specified, clip the global norm of gradients by this amount. + "grad_clip": None, + # Target value for KL divergence. + "kl_target": 0.01, + # Whether to rollout "complete_episodes" or "truncate_episodes". + "batch_mode": "truncate_episodes", + # Which observation filter to apply to the observation. + "observation_filter": "NoFilter", + # Uses the sync samples optimizer instead of the multi-gpu one. This is + # usually slower, but you might want to try it if you run into issues with + # the default optimizer. + "simple_optimizer": False, + # Use PyTorch as framework? + "use_pytorch": False +}) +# __sphinx_doc_end__ +# yapf: enable + + +def choose_policy_optimizer(workers, config): + if config["simple_optimizer"]: + return SyncSamplesOptimizer( + workers, + num_sgd_iter=config["num_sgd_iter"], + train_batch_size=config["train_batch_size"], + sgd_minibatch_size=config["sgd_minibatch_size"], + standardize_fields=["advantages"]) + + return LocalMultiGPUOptimizer( + workers, + sgd_batch_size=config["sgd_minibatch_size"], + num_sgd_iter=config["num_sgd_iter"], + num_gpus=config["num_gpus"], + rollout_fragment_length=config["rollout_fragment_length"], + num_envs_per_worker=config["num_envs_per_worker"], + train_batch_size=config["train_batch_size"], + standardize_fields=["advantages"], + shuffle_sequences=config["shuffle_sequences"]) + + +def update_kl(trainer, fetches): + # Single-agent. + if "kl" in fetches: + trainer.workers.local_worker().for_policy( + lambda pi: pi.update_kl(fetches["kl"])) + + # Multi-agent. + else: + + def update(pi, pi_id): + if pi_id in fetches: + pi.update_kl(fetches[pi_id]["kl"]) + else: + logger.debug("No data for {}, not updating kl".format(pi_id)) + + trainer.workers.local_worker().foreach_trainable_policy(update) + + +def warn_about_bad_reward_scales(trainer, result): + if result["policy_reward_mean"]: + return # Punt on handling multiagent case. + + # Warn about excessively high VF loss. + learner_stats = result["info"]["learner"] + if "default_policy" in learner_stats: + scaled_vf_loss = (trainer.config["vf_loss_coeff"] * + learner_stats["default_policy"]["vf_loss"]) + policy_loss = learner_stats["default_policy"]["policy_loss"] + if trainer.config["vf_share_layers"] and scaled_vf_loss > 100: + logger.warning( + "The magnitude of your value function loss is extremely large " + "({}) compared to the policy loss ({}). This can prevent the " + "policy from learning. Consider scaling down the VF loss by " + "reducing vf_loss_coeff, or disabling vf_share_layers.".format( + scaled_vf_loss, policy_loss)) + + # Warn about bad clipping configs + if trainer.config["vf_clip_param"] <= 0: + rew_scale = float("inf") + else: + rew_scale = round( + abs(result["episode_reward_mean"]) / + trainer.config["vf_clip_param"], 0) + if rew_scale > 200: + logger.warning( + "The magnitude of your environment rewards are more than " + "{}x the scale of `vf_clip_param`. ".format(rew_scale) + + "This means that it will take more than " + "{} iterations for your value ".format(rew_scale) + + "function to converge. If this is not intended, consider " + "increasing `vf_clip_param`.") + + +def validate_config(config): + if config["entropy_coeff"] < 0: + raise DeprecationWarning("entropy_coeff must be >= 0") + if isinstance(config["entropy_coeff"], int): + config["entropy_coeff"] = float(config["entropy_coeff"]) + if config["sgd_minibatch_size"] > config["train_batch_size"]: + raise ValueError( + "Minibatch size {} must be <= train batch size {}.".format( + config["sgd_minibatch_size"], config["train_batch_size"])) + if config["batch_mode"] == "truncate_episodes" and not config["use_gae"]: + raise ValueError( + "Episode truncation is not supported without a value " + "function. Consider setting batch_mode=complete_episodes.") + if config["multiagent"]["policies"] and not config["simple_optimizer"]: + logger.info( + "In multi-agent mode, policies will be optimized sequentially " + "by the multi-GPU optimizer. Consider setting " + "simple_optimizer=True if this doesn't work for you.") + if config["simple_optimizer"]: + logger.warning( + "Using the simple minibatch optimizer. This will significantly " + "reduce performance, consider simple_optimizer=False.") + elif config["use_pytorch"] or (tf and tf.executing_eagerly()): + config["simple_optimizer"] = True # multi-gpu not supported + + +def get_policy_class(config): + if config.get("use_pytorch") is True: + from ray.rllib.agents.ppo.ppo_torch_policy import PPOTorchPolicy + return PPOTorchPolicy + else: + return CustomPPOTFPolicy + + +CustomPPOTrainer = build_trainer( + name="PPO", + default_config=DEFAULT_CONFIG, + default_policy=CustomPPOTFPolicy, + get_policy_class=get_policy_class, + make_policy_optimizer=choose_policy_optimizer, + validate_config=validate_config, + after_optimizer_step=update_kl, + after_train_result=warn_about_bad_reward_scales) + + +from ray.rllib.policy.sample_batch import DEFAULT_POLICY_ID +def import_model(self, import_file, policy_id=DEFAULT_POLICY_ID): + """Imports a model from import_file. + + Note: Currently, only h5 files are supported. + + Args: + import_file (str): The file to import the model from. + + Returns: + A dict that maps ExportFormats to successfully exported models. + """ + # Check for existence. + if not os.path.exists(import_file): + raise FileNotFoundError( + "`import_file` '{}' does not exist! Can't import Model.". + format(import_file)) + # Get the format of the given file. + import_format = "h5" # TODO(sven): Support checkpoint loading. + + ExportFormat.validate([import_format]) + if import_format != ExportFormat.H5: + raise NotImplementedError + else: + return self.import_policy_model_from_h5(import_file, policy_id=policy_id) + +from ray.rllib.agents import Trainer +setattr(Trainer, 'import_model', import_model) \ No newline at end of file diff --git a/flow/algorithms/imitation_learning/custom_ppo_tf_policy.py b/flow/algorithms/imitation_learning/custom_ppo_tf_policy.py new file mode 100644 index 000000000..680b7cf76 --- /dev/null +++ b/flow/algorithms/imitation_learning/custom_ppo_tf_policy.py @@ -0,0 +1,288 @@ +""" +Copied from RLLIb's ppo_tf_policy, but additionally tracks value function predictions in kl_and_loss_stats. Used +to evaluate the value function learned after imitation. +""" + +import logging + +import ray +from ray.rllib.agents.impala.vtrace_policy import BEHAVIOUR_LOGITS +from ray.rllib.evaluation.postprocessing import compute_advantages, \ + Postprocessing +from ray.rllib.policy.sample_batch import SampleBatch +from ray.rllib.policy.policy import ACTION_LOGP +from ray.rllib.policy.tf_policy import LearningRateSchedule, \ + EntropyCoeffSchedule +from ray.rllib.policy.tf_policy_template import build_tf_policy +from ray.rllib.utils.explained_variance import explained_variance +from ray.rllib.utils.tf_ops import make_tf_callable +from ray.rllib.utils import try_import_tf + +tf = try_import_tf() + +logger = logging.getLogger(__name__) + + +class PPOLoss: + def __init__(self, + dist_class, + model, + value_targets, + advantages, + actions, + prev_logits, + prev_actions_logp, + vf_preds, + curr_action_dist, + value_fn, + cur_kl_coeff, + valid_mask, + entropy_coeff=0, + clip_param=0.1, + vf_clip_param=0.1, + vf_loss_coeff=1.0, + use_gae=True): + """Constructs the loss for Proximal Policy Objective. + + Arguments: + dist_class: action distribution class for logits. + value_targets (Placeholder): Placeholder for target values; used + for GAE. + actions (Placeholder): Placeholder for actions taken + from previous model evaluation. + advantages (Placeholder): Placeholder for calculated advantages + from previous model evaluation. + prev_logits (Placeholder): Placeholder for logits output from + previous model evaluation. + prev_actions_logp (Placeholder): Placeholder for action prob output + from the previous (before update) Model evaluation. + vf_preds (Placeholder): Placeholder for value function output + from the previous (before update) Model evaluation. + curr_action_dist (ActionDistribution): ActionDistribution + of the current model. + value_fn (Tensor): Current value function output Tensor. + cur_kl_coeff (Variable): Variable holding the current PPO KL + coefficient. + valid_mask (Optional[tf.Tensor]): An optional bool mask of valid + input elements (for max-len padded sequences (RNNs)). + entropy_coeff (float): Coefficient of the entropy regularizer. + clip_param (float): Clip parameter + vf_clip_param (float): Clip parameter for the value function + vf_loss_coeff (float): Coefficient of the value function loss + use_gae (bool): If true, use the Generalized Advantage Estimator. + """ + if valid_mask is not None: + + def reduce_mean_valid(t): + return tf.reduce_mean(tf.boolean_mask(t, valid_mask)) + + else: + + def reduce_mean_valid(t): + return tf.reduce_mean(t) + + prev_dist = dist_class(prev_logits, model) + # Make loss functions. + logp_ratio = tf.exp(curr_action_dist.logp(actions) - prev_actions_logp) + action_kl = prev_dist.kl(curr_action_dist) + self.mean_kl = reduce_mean_valid(action_kl) + + curr_entropy = curr_action_dist.entropy() + self.mean_entropy = reduce_mean_valid(curr_entropy) + + surrogate_loss = tf.minimum( + advantages * logp_ratio, + advantages * tf.clip_by_value(logp_ratio, 1 - clip_param, + 1 + clip_param)) + self.mean_policy_loss = reduce_mean_valid(-surrogate_loss) + + if use_gae: + vf_loss1 = tf.square(value_fn - value_targets) + vf_clipped = vf_preds + tf.clip_by_value( + value_fn - vf_preds, -vf_clip_param, vf_clip_param) + vf_loss2 = tf.square(vf_clipped - value_targets) + vf_loss = tf.maximum(vf_loss1, vf_loss2) + self.mean_vf_loss = reduce_mean_valid(vf_loss) + loss = reduce_mean_valid( + -surrogate_loss + cur_kl_coeff * action_kl + + vf_loss_coeff * vf_loss - entropy_coeff * curr_entropy) + else: + self.mean_vf_loss = tf.constant(0.0) + loss = reduce_mean_valid(-surrogate_loss + + cur_kl_coeff * action_kl - + entropy_coeff * curr_entropy) + self.loss = loss + + +def ppo_surrogate_loss(policy, model, dist_class, train_batch): + logits, state = model.from_batch(train_batch) + action_dist = dist_class(logits, model) + + mask = None + if state: + max_seq_len = tf.reduce_max(train_batch["seq_lens"]) + mask = tf.sequence_mask(train_batch["seq_lens"], max_seq_len) + mask = tf.reshape(mask, [-1]) + + policy.loss_obj = PPOLoss( + dist_class, + model, + train_batch[Postprocessing.VALUE_TARGETS], + train_batch[Postprocessing.ADVANTAGES], + train_batch[SampleBatch.ACTIONS], + train_batch[BEHAVIOUR_LOGITS], + train_batch[ACTION_LOGP], + train_batch[SampleBatch.VF_PREDS], + action_dist, + model.value_function(), + policy.kl_coeff, + mask, + entropy_coeff=policy.entropy_coeff, + clip_param=policy.config["clip_param"], + vf_clip_param=policy.config["vf_clip_param"], + vf_loss_coeff=policy.config["vf_loss_coeff"], + use_gae=policy.config["use_gae"], + ) + + return policy.loss_obj.loss + + +def kl_and_loss_stats(policy, train_batch): + return { + "cur_kl_coeff": tf.cast(policy.kl_coeff, tf.float64), + "cur_lr": tf.cast(policy.cur_lr, tf.float64), + "total_loss": policy.loss_obj.loss, + "policy_loss": policy.loss_obj.mean_policy_loss, + "vf_loss": policy.loss_obj.mean_vf_loss, + "vf_preds": policy.model.value_function(), + "vf_targets": train_batch[Postprocessing.VALUE_TARGETS], + "vf_explained_var": explained_variance( + train_batch[Postprocessing.VALUE_TARGETS], + policy.model.value_function()), + "kl": policy.loss_obj.mean_kl, + "entropy": policy.loss_obj.mean_entropy, + "entropy_coeff": tf.cast(policy.entropy_coeff, tf.float64), + } + + +def vf_preds_and_logits_fetches(policy): + """Adds value function and logits outputs to experience train_batches.""" + return { + SampleBatch.VF_PREDS: policy.model.value_function(), + BEHAVIOUR_LOGITS: policy.model.last_output(), + } + + +def postprocess_ppo_gae(policy, + sample_batch, + other_agent_batches=None, + episode=None): + """Adds the policy logits, VF preds, and advantages to the trajectory.""" + + completed = sample_batch["dones"][-1] + if completed: + last_r = 0.0 + else: + next_state = [] + for i in range(policy.num_state_tensors()): + next_state.append([sample_batch["state_out_{}".format(i)][-1]]) + last_r = policy._value(sample_batch[SampleBatch.NEXT_OBS][-1], + sample_batch[SampleBatch.ACTIONS][-1], + sample_batch[SampleBatch.REWARDS][-1], + *next_state) + batch = compute_advantages( + sample_batch, + last_r, + policy.config["gamma"], + policy.config["lambda"], + use_gae=policy.config["use_gae"]) + return batch + + +def clip_gradients(policy, optimizer, loss): + variables = policy.model.trainable_variables() + if policy.config["grad_clip"] is not None: + grads_and_vars = optimizer.compute_gradients(loss, variables) + grads = [g for (g, v) in grads_and_vars] + policy.grads, _ = tf.clip_by_global_norm(grads, + policy.config["grad_clip"]) + clipped_grads = list(zip(policy.grads, variables)) + return clipped_grads + else: + return optimizer.compute_gradients(loss, variables) + + +class KLCoeffMixin: + def __init__(self, config): + # KL Coefficient + self.kl_coeff_val = config["kl_coeff"] + self.kl_target = config["kl_target"] + self.kl_coeff = tf.get_variable( + initializer=tf.constant_initializer(self.kl_coeff_val), + name="kl_coeff", + shape=(), + trainable=False, + dtype=tf.float32) + + def update_kl(self, sampled_kl): + if sampled_kl > 2.0 * self.kl_target: + self.kl_coeff_val *= 1.5 + elif sampled_kl < 0.5 * self.kl_target: + self.kl_coeff_val *= 0.5 + self.kl_coeff.load(self.kl_coeff_val, session=self.get_session()) + return self.kl_coeff_val + + +class ValueNetworkMixin: + def __init__(self, obs_space, action_space, config): + if config["use_gae"]: + + @make_tf_callable(self.get_session()) + def value(ob, prev_action, prev_reward, *state): + model_out, _ = self.model({ + SampleBatch.CUR_OBS: tf.convert_to_tensor([ob]), + SampleBatch.PREV_ACTIONS: tf.convert_to_tensor( + [prev_action]), + SampleBatch.PREV_REWARDS: tf.convert_to_tensor( + [prev_reward]), + "is_training": tf.convert_to_tensor(False), + }, [tf.convert_to_tensor([s]) for s in state], + tf.convert_to_tensor([1])) + return self.model.value_function()[0] + + else: + + @make_tf_callable(self.get_session()) + def value(ob, prev_action, prev_reward, *state): + return tf.constant(0.0) + + self._value = value + + +def setup_config(policy, obs_space, action_space, config): + # auto set the model option for layer sharing + config["model"]["vf_share_layers"] = config["vf_share_layers"] + + +def setup_mixins(policy, obs_space, action_space, config): + ValueNetworkMixin.__init__(policy, obs_space, action_space, config) + KLCoeffMixin.__init__(policy, config) + EntropyCoeffSchedule.__init__(policy, config["entropy_coeff"], + config["entropy_coeff_schedule"]) + LearningRateSchedule.__init__(policy, config["lr"], config["lr_schedule"]) + + +CustomPPOTFPolicy = build_tf_policy( + name="PPOTFPolicy", + get_default_config=lambda: ray.rllib.agents.ppo.ppo.DEFAULT_CONFIG, + loss_fn=ppo_surrogate_loss, + stats_fn=kl_and_loss_stats, + extra_action_fetches_fn=vf_preds_and_logits_fetches, + postprocess_fn=postprocess_ppo_gae, + gradients_fn=clip_gradients, + before_init=setup_config, + before_loss_init=setup_mixins, + mixins=[ + LearningRateSchedule, EntropyCoeffSchedule, KLCoeffMixin, + ValueNetworkMixin + ]) diff --git a/flow/algorithms/imitation_learning/custom_trainable.py b/flow/algorithms/imitation_learning/custom_trainable.py new file mode 100644 index 000000000..993113607 --- /dev/null +++ b/flow/algorithms/imitation_learning/custom_trainable.py @@ -0,0 +1,65 @@ +from ray import tune +try: + from ray.rllib.agents.agent import get_agent_class +except ImportError: + from ray.rllib.agents.registry import get_agent_class +import flow.algorithms.imitation_learning.custom_ppo as custom_ppo + +class Imitation_PPO_Trainable(tune.Trainable): + """ + Class to train PPO with imitation, with Tune. Extends Trainable. + """ + + def _setup(self, config): + """ + Sets up trainable. See superclass definition. + """ + + env_name = config['env'] + self.trainer = custom_ppo.CustomPPOTrainer(env=env_name, config=config) + # kind of hacky, but don't know a better solution to the default policy not existing + policy_id = list(self.trainer.get_weights().keys())[0] + print("test: ", list(self.trainer.get_weights().keys())) + self.trainer.import_model(config['model']['custom_options']['h5_load_path'], policy_id=policy_id) + + def _train(self): + """ + Executes one training iteration on trainer. See superclass definition. + """ + + return self.trainer.train() + + def _save(self, tmp_checkpoint_dir): + """ + Saves trainer. See superclass definition. + """ + return self.trainer._save(tmp_checkpoint_dir) + + def _restore(self, checkpoint): + """ + Restores trainer from checkpoint. See superclass definition. + """ + self.trainer.restore(checkpoint) + + def _log_result(self, result): + """ + Logs results of trainer. See superclass definition. + """ + self.trainer._log_result(result) + + def _stop(self): + """ + Stops trainer. See superclass definition. + """ + self.trainer.stop() + + def _export_model(self, export_formats, export_dir): + """ + Exports trainer model. See superclass definition. + """ + return self.trainer.export_model(export_formats, export_dir=export_dir) + + + + + diff --git a/flow/algorithms/imitation_learning/imitating_controller.py b/flow/algorithms/imitation_learning/imitating_controller.py new file mode 100644 index 000000000..115930744 --- /dev/null +++ b/flow/algorithms/imitation_learning/imitating_controller.py @@ -0,0 +1,69 @@ +from flow.controllers.base_controller import BaseController + + +class ImitatingController(BaseController): + """ + Controller which uses a given neural net to imitate an expert. Subclasses BaseController + """ + + # Implementation in Tensorflow Keras + + def __init__(self, veh_id, action_network, multiagent, car_following_params=None, time_delay=0.0, noise=0, fail_safe=None): + """ + Parameters + __________ + veh_id: String + ID of vehicle to control + action_network: ImitatingNetwork + Instance of imitating_network class; neural net that gives action given state + multiagent: bool + boolean indicating if env is multiagent or singleagent + """ + + BaseController.__init__(self, veh_id, car_following_params, delay=time_delay, fail_safe=fail_safe, noise=noise) + self.action_network = action_network + self.multiagent = multiagent + self.veh_id = veh_id + + + def get_accel(self, env): + """ + Get acceleration for vehicle in the environment. Overrides superclass method. + Parameters + __________ + env: Gym Env + instance of environment being used + """ + # observation is a dictionary for multiagent envs, list for singleagent envs + + if self.multiagent: + # if vehicle is in non-control edge, it will not be in observation, so return None to default control to Sumo + if self.veh_id not in env.get_state().keys(): + return None + observation = env.get_state()[self.veh_id] + else: + observation = env.get_state() + + # get action from neural net + action = self.action_network.get_accel_from_observation(observation)[0] + + # handles singleagent case in which there are multiple RL vehicles sharing common state + # if action space is multidimensional, obtain the corresponding action for the vehicle + if not self.multiagent and self.action_network.action_dim > 1: + + # get_sorted_rl_ids used for singleagent_straight_road; use get_rl_ids if method does not exist + if hasattr(env, 'get_sorted_rl_ids'): + rl_ids = env.get_sorted_rl_ids() + else: + rl_ids = env.get_rl_ids() + + if not (self.veh_id in rl_ids): + # vehicle in non-control edge, so return None to default control to Sumo + return None + + # return the action taken by the vehicle + ind = rl_ids.index(self.veh_id) + return action[ind] + + # in other cases, acceleration is the output of the network + return action diff --git a/flow/algorithms/imitation_learning/imitating_network.py b/flow/algorithms/imitation_learning/imitating_network.py new file mode 100644 index 000000000..6e9e9c3c7 --- /dev/null +++ b/flow/algorithms/imitation_learning/imitating_network.py @@ -0,0 +1,255 @@ +import numpy as np +import tensorflow as tf +from flow.algorithms.imitation_learning.keras_utils import build_neural_net_deterministic, build_neural_net_stochastic, get_loss, negative_log_likelihood_loss +from flow.algorithms.imitation_learning.replay_buffer import ReplayBuffer + + +class ImitatingNetwork(): + """ + Class containing neural network which learns to imitate a given expert controller. + """ + + def __init__(self, sess, action_dim, obs_dim, fcnet_hiddens, replay_buffer_size, learning_rate, stochastic=False, variance_regularizer = 0, load_model=False, load_path='', tensorboard_path=''): + + """Initializes and constructs neural network. + Parameters + ---------- + sess : tf.Session + Tensorflow session variable + action_dim : int + action_space dimension + obs_dim : int + dimension of observation space (size of network input) + fcnet_hiddens : list + list of hidden layer sizes for fully connected network (length of list is number of hidden layers) + replay_buffer_size: int + maximum size of replay buffer used to hold data for training + stochastic: bool + indicates if network outputs a stochastic (MV Gaussian) or deterministic policy + variance_regularizer: float + regularization hyperparameter to penalize high variance policies + load_model: bool + if True, load model from path specified in load_path + load_path: String + path to h5 file containing model to load. + + """ + + self.sess = sess + self.action_dim = action_dim + self.obs_dim = obs_dim + self.fcnet_hiddens = fcnet_hiddens + self.stochastic=stochastic + self.variance_regularizer = variance_regularizer + self.learning_rate = learning_rate + + self.train_steps = 0 + self.action_steps = 0 + + self.writer = tf.summary.FileWriter(tensorboard_path, tf.get_default_graph()) + + # load network if specified, or construct network + if load_model: + self.load_network(load_path) + else: + self.build_network() + self.compile_network() + + self.replay_buffer = ReplayBuffer(replay_buffer_size) + + def build_network(self): + """ + Defines neural network for choosing actions. Defines placeholders and forward pass + """ + # setup placeholders for network input and labels for training, and hidden layers/output + if self.stochastic: + self.model = build_neural_net_stochastic(self.obs_dim, self.action_dim, self.fcnet_hiddens) + else: + self.model = build_neural_net_deterministic(self.obs_dim, self.action_dim, self.fcnet_hiddens) + + + def compile_network(self): + """ + Compiles Keras network with appropriate loss and optimizer + """ + loss = get_loss(self.stochastic, self.variance_regularizer) + self.model.compile(loss=loss, optimizer=tf.keras.optimizers.Adam(learning_rate=self.learning_rate)) + + + def train(self, observation_batch, action_batch): + """ + Executes one training (gradient) step for the given batch of observation and action data + + Parameters + ---------- + observation_batch : numpy array + numpy array containing batch of observations (inputs) + action_batch : numpy array + numpy array containing batch of actions (labels) + """ + + # reshape action_batch to ensure a shape (batch_size, action_dim) + action_batch = action_batch.reshape(action_batch.shape[0], self.action_dim) + # one gradient step on batch + loss = self.model.train_on_batch(observation_batch, action_batch) + + # tensorboard + summary = tf.Summary(value=[tf.Summary.Value(tag="imitation training loss", simple_value=loss), ]) + self.writer.add_summary(summary, global_step=self.train_steps) + self.train_steps += 1 + + def get_accel_from_observation(self, observation): + """ + Gets the network's acceleration prediction based on given observation/state + + Parameters + ---------- + observation : numpy array + numpy array containing a single observation + + Returns + ------- + numpy array + one element numpy array containing acceleration + """ + + # network expects an array of arrays (matrix); if single observation (no batch), convert to array of arrays + if len(observation.shape)<=1: + observation = observation[None] + # "batch size" is 1, so just get single acceleration/acceleration vector + network_output = self.model.predict(observation) + if self.stochastic: + mean, log_std = network_output[:, :self.action_dim], network_output[:, self.action_dim:] + var = np.exp(2 * log_std) + + # track variance norm on tensorboard + variance_norm = np.linalg.norm(var) + summary = tf.Summary(value=[tf.Summary.Value(tag="Variance norm", simple_value=variance_norm), ]) + self.writer.add_summary(summary, global_step=self.action_steps) + + # var is a 1 x d numpy array, where d is the dimension of the action space, so get the first element and form cov matrix + cov_matrix = np.diag(var[0]) + action = np.random.multivariate_normal(mean[0], cov_matrix) + + self.action_steps += 1 + return action + else: + self.action_steps += 1 + return network_output + + + def get_accel(self, env): + """ + Get network's acceleration prediction(s) based on given env + + Parameters + ---------- + env : + environment object + + Returns + ------- + numpy array + one element numpy array containing accleeration + + """ + observation = env.get_state() + return self.get_accel_from_observation(observation) + + + def add_to_replay_buffer(self, rollout_list): + """ + Add data to a replay buffer + + Parameters + ---------- + rollout_list : list + list of rollout dictionaries + """ + + self.replay_buffer.add_rollouts(rollout_list) + + + def sample_data(self, batch_size): + """ + Sample a batch of data from replay buffer. + + Parameters + ---------- + batch_size : int + size of batch to sample + """ + + return self.replay_buffer.sample_batch(batch_size) + + def save_network(self, save_path): + """ + Save imitation network as a h5 file in save_path + + Parameters + ---------- + save_path : String + path to h5 file to save to + """ + + self.model.save(save_path) + # tensorboard + + # writer = tf.summary.FileWriter('./graphs2', tf.get_default_graph()) + + def load_network(self, load_path): + """ + Load imitation network from a h5 file in load_path + + Parameters + ---------- + load_path : String + path to h5 file containing model to load from + """ + if self.stochastic: + self.model = tf.keras.models.load_model(load_path, custom_objects={'nll_loss': negative_log_likelihood_loss(self.variance_regularizer)}) + else: + self.model = tf.keras.models.load_model(load_path) + + + def save_network_PPO(self, save_path): + """ + Build a model, with same policy architecture as imitation network, to run PPO, copy weights from imitation, and save this model. + + Parameters + ---------- + load_path : save_path + path to h5 file to save to + """ + + input = tf.keras.layers.Input(self.model.input.shape[1].value) + curr_layer = input + + # number of hidden layers + num_layers = len(self.model.layers) - 2 + + # build layers for policy + for i in range(num_layers): + size = self.model.layers[i + 1].output.shape[1].value + activation = tf.keras.activations.serialize(self.model.layers[i + 1].activation) + curr_layer = tf.keras.layers.Dense(size, activation=activation, name="policy_hidden_layer_{}".format(i + 1))(curr_layer) + output_layer_policy = tf.keras.layers.Dense(self.model.output.shape[1].value, activation=None, name="policy_output_layer")(curr_layer) + + # build layers for value function + curr_layer = input + for i in range(num_layers): + size = self.fcnet_hiddens[i] + curr_layer = tf.keras.layers.Dense(size, activation="tanh", name="vf_hidden_layer_{}".format(i+1))(curr_layer) + output_layer_vf = tf.keras.layers.Dense(1, activation=None, name="vf_output_layer")(curr_layer) + + ppo_model = tf.keras.Model(inputs=input, outputs=[output_layer_policy, output_layer_vf], name="ppo_model") + + # set the policy weights to those learned from imitation + for i in range(num_layers): + policy_layer = ppo_model.get_layer(name="policy_hidden_layer_{}".format(i + 1)) + policy_layer.set_weights(self.model.layers[i + 1].get_weights()) + policy_output = ppo_model.get_layer("policy_output_layer") + policy_output.set_weights(self.model.layers[-1].get_weights()) + + # save the model (as a h5 file) + ppo_model.save(save_path) diff --git a/flow/algorithms/imitation_learning/keras_utils.py b/flow/algorithms/imitation_learning/keras_utils.py new file mode 100644 index 000000000..f5d9924b8 --- /dev/null +++ b/flow/algorithms/imitation_learning/keras_utils.py @@ -0,0 +1,133 @@ +import tensorflow as tf +import tensorflow_probability as tfp +from tensorflow.keras import Input +from tensorflow.keras.layers import Dense + +def build_neural_net_deterministic(input_dim, action_dim, fcnet_hiddens): + """Build a keras model to output a deterministic policy. + Parameters + ---------- + input_dim : int + dimension of input layer + action_dim : int + action_space dimension + fcnet_hiddens : list + list containing size of each hidden layer (length of list is number of hidden layers) + + Returns + ------- + Keras model (untrained) + """ + + input_layer = Input(shape=(input_dim, )) + curr_layer = input_layer + + for i in range(len(fcnet_hiddens)): + size = fcnet_hiddens[i] + dense = Dense(size, activation="tanh") + curr_layer = dense(curr_layer) + output_layer = Dense(action_dim, activation=None)(curr_layer) + model = tf.keras.Model(inputs=input_layer, outputs=output_layer, name="policy_network") + + return model + +def build_neural_net_stochastic(input_dim, action_dim, fcnet_hiddens): + """Build a keras model to output a stochastic policy. + Parameters + ---------- + input_dim : int + dimension of input layer + action_dim : int + action_space dimension + fcnet_hiddens : list + list containing size of each hidden layer (length of list is number of hidden layers) + + Returns + ------- + Keras model (untrained) + """ + input_layer = Input(shape=(input_dim, )) + curr_layer = input_layer + + for i in range(len(fcnet_hiddens)): + size = fcnet_hiddens[i] + dense = Dense(size, activation="tanh") + curr_layer = dense(curr_layer) + + out = Dense(2 * action_dim, activation=None)(curr_layer) + model = tf.keras.Model(inputs=input_layer, outputs=out, name="policy_network") + + return model + +def get_loss(stochastic, variance_regularizer): + """Get appropriate loss function for training. + Parameters + ---------- + stochastic : bool + determines if policy to be learned is deterministic or stochastic + variance_regularizer : float + regularization hyperparameter to penalize high variance policies + + Returns + ------- + Keras loss function to use for imitation learning. + """ + if stochastic: + return negative_log_likelihood_loss(variance_regularizer) + else: + return tf.keras.losses.mean_squared_error + +def negative_log_likelihood_loss(variance_regularizer): + """Negative log likelihood loss for learning stochastic policies. + + Parameters + ---------- + variance_regularizer : float + regularization hyperparameter to penalize high variance policies + Returns + ------- + Negative log likelihood loss function with variance regularization. + """ + + def nll_loss(y, network_output): + assert network_output.shape[1] % 2 == 0, "Stochastic policies must output vectors of even length" + + action_dim = network_output.shape[1] // 2 + + # first half of network_output is mean, second half is log_std + means, log_stds = tf.split(network_output, 2, axis=1) + stds = tf.math.exp(log_stds) + # variances = tf.math.square(stds) + + # Multivariate Gaussian distribution + dist = tfp.distributions.MultivariateNormalDiag(loc=means, scale_diag=stds) + loss = dist.log_prob(y) + loss = tf.negative(loss) + loss = tf.reduce_mean(loss) + (variance_regularizer * tf.norm(stds)) + return loss + + return nll_loss + +def compare_weights(ppo_model, imitation_path): + imitation_model = tf.keras.models.load_model(imitation_path, custom_objects={'nll_loss': negative_log_likelihood_loss(0.5)}) + + for i in range(len(imitation_model.layers) - 2): + ppo_name = 'policy_hidden_layer_' + str(i + 1) + ppo_layer = ppo_model.get_layer(ppo_name) + im_layer = imitation_model.layers[i + 1] + + ppo_weights = ppo_layer.get_weights() + im_weights = im_layer.get_weights() + for i in range(len(ppo_weights)): + assert (ppo_weights[i] == im_weights[i]).all(), "Weights don't match!" + + ppo_layer = ppo_model.get_layer('policy_output_layer') + im_layer = imitation_model.layers[-1] + ppo_weights = ppo_layer.get_weights() + im_weights = im_layer.get_weights() + for i in range(len(ppo_weights)): + assert (ppo_weights[i] == im_weights[i]).all(), "Weights don't match!" + + print("\n\nWeights properly loaded\n\n") + + diff --git a/flow/algorithms/imitation_learning/ppo_model.py b/flow/algorithms/imitation_learning/ppo_model.py new file mode 100644 index 000000000..47ae61f77 --- /dev/null +++ b/flow/algorithms/imitation_learning/ppo_model.py @@ -0,0 +1,130 @@ + +from ray.rllib.models.tf.tf_modelv2 import TFModelV2 +from flow.algorithms.imitation_learning.keras_utils import * + + +class PPONetwork(TFModelV2): + """ + Custom RLLib PPOModel (using tensorflow keras) to load weights from a pre-trained policy model (e.g. from imitation learning) and start RL training with loaded weights. + Subclass of TFModelV2. See https://docs.ray.io/en/master/rllib-models.html. + """ + + def __init__(self, obs_space, action_space, num_outputs, model_config, name, **kwargs): + """ + Parameters + __________ + obs_space: gym.Space + observation space of gym environment + action_space: gym.Space + action_space of gym environment + num_outputs: int + number of outputs for policy network. For deterministic policies, this is dimension of the action space. For continuous stochastic policies, this is 2 * dimension of the action space + model_config: dict + configuration of model + name: str + name of model + + """ + + super(PPONetwork, self).__init__(obs_space, action_space, num_outputs, model_config, name) + + h5_path = kwargs.get("h5_load_path", "") + + # setup model with weights loaded in from model in h5 path + self.setup_model(obs_space, action_space, model_config, num_outputs, h5_path) + self.register_variables(self.base_model.variables) + + + + def setup_model(self, obs_space, action_space, model_config, num_outputs, imitation_h5_path): + """ + Loads/builds model for both policy and value function + Parameters + __________ + + obs_space: gym.Space + observation space of env + action_space: gym.Space + action space of env + model_config: dict + configuration parameters for model + num_outputs: int + number of outputs expected for policy + imitation_h5_path: str + path to h5 file containing weights of a pretrained network (empty string if no such file) + """ + + activation = model_config.get("fcnet_activation") + hiddens = model_config.get("fcnet_hiddens", []) + vf_share_layers = model_config.get("vf_share_layers") + + # set up model + inp_layer = tf.keras.layers.Input(shape=obs_space.shape, name="input_layer") + curr_layer = inp_layer + + # hidden layers and output for policy + i = 1 + for size in hiddens: + curr_layer = tf.keras.layers.Dense(size, name="policy_hidden_layer_{}".format(i), + activation=activation)(curr_layer) + i += 1 + + output_layer_policy = tf.keras.layers.Dense(num_outputs, name="policy_output_layer", activation=None)( + curr_layer) + + # set up value function + if not vf_share_layers: + curr_layer = inp_layer + i = 1 + for size in hiddens: + curr_layer = tf.keras.layers.Dense(size, name="vf_hidden_layer_{}".format(i), + activation=activation)(curr_layer) + i += 1 + + output_layer_vf = tf.keras.layers.Dense(1, name="vf_output_layer", activation=None)(curr_layer) + + # build model from layers + self.base_model = tf.keras.Model(inp_layer, [output_layer_policy, output_layer_vf]) + + def forward(self, input_dict, state, seq_lens): + """ + Overrides parent class's method. Used to pass a input through model and get policy/vf output. + Parameters + __________ + input_dict: dict + dictionary of input tensors, including “obs”, “obs_flat”, “prev_action”, “prev_reward”, “is_training” + state: list + list of state tensors with sizes matching those returned by get_initial_state + the batch dimension + seq_lens: tensor + 1d tensor holding input sequence lengths + + Returns + _______ + (outputs, state) + Tuple, first element is policy output, second element state + """ + policy_out, value_out = self.base_model(input_dict["obs_flat"]) + self.value_out = value_out + return policy_out, state + + def value_function(self): + """ + Returns the value function output for the most recent forward pass. + + Returns + _______ + tensor + value estimate tensor of shape [BATCH]. + """ + return tf.reshape(self.value_out, [-1]) + + + def import_from_h5(self, import_file): + """ + Overrides parent class method. Import base_model from h5 import_file. + Parameters: + __________ + import_file: str + filepath to h5 file + """ + self.base_model.load_weights(import_file) diff --git a/flow/algorithms/imitation_learning/replay_buffer.py b/flow/algorithms/imitation_learning/replay_buffer.py new file mode 100644 index 000000000..47ebebaa6 --- /dev/null +++ b/flow/algorithms/imitation_learning/replay_buffer.py @@ -0,0 +1,106 @@ +import time +import numpy as np +import os + + +class ReplayBuffer(object): + """ Replay buffer class to store state, action, expert_action, reward, next_state, terminal tuples""" + + def __init__(self, max_size=100000): + """ + Parameters + __________ + max_size: int + maximum size of replay buffer + """ + + # max size of buffer + self.max_size = max_size + + # store each rollout + self.rollouts = [] + + # store component arrays from each rollout + self.observations = None + self.actions = None + self.expert_actions = None + self.rewards = None + self.next_observations = None + self.terminals = None + + + def add_rollouts(self, rollouts_list): + """ + Add a list of rollouts to the replay buffer + + Parameters + __________ + rollouts_list: list + list of rollout dictionaries + + """ + + for rollout in rollouts_list: + self.rollouts.append(rollout) + + observations, actions, expert_actions, rewards, next_observations, terminals = self.unpack_rollouts(rollouts_list) + + assert (not np.any(np.isnan(expert_actions))), "Invalid actions added to replay buffer" + + # only keep max_size tuples in buffer + if self.observations is None: + self.observations = observations[-self.max_size:] + self.actions = actions[-self.max_size:] + self.expert_actions = expert_actions[-self.max_size:] + self.rewards = rewards[-self.max_size:] + self.next_observations = next_observations[-self.max_size:] + self.terminals = terminals[-self.max_size:] + else: + self.observations = np.concatenate([self.observations, observations])[-self.max_size:] + self.actions = np.concatenate([self.actions, actions])[-self.max_size:] + self.expert_actions = np.concatenate([self.expert_actions, expert_actions])[-self.max_size:] + self.rewards = np.concatenate([self.rewards, rewards])[-self.max_size:] + self.next_observations = np.concatenate([self.next_observations, next_observations])[-self.max_size:] + self.terminals = np.concatenate([self.terminals, terminals])[-self.max_size:] + + def sample_batch(self, batch_size): + """ + Sample a batch of data (with size batch_size) from replay buffer. + + Parameters + ---------- + batch_size: int + size of batch to sample + + Returns + _______ + Data in separate numpy arrays of observations, actions, and expert actionis + """ + assert self.observations is not None and self.actions is not None and self.expert_actions is not None + + size = len(self.observations) + rand_inds = np.random.randint(0, size, batch_size) + return self.observations[rand_inds], self.actions[rand_inds], self.expert_actions[rand_inds] + + + + def unpack_rollouts(self, rollouts_list): + """ + Convert list of rollout dictionaries to individual observation, action, rewards, next observation, terminal arrays + Parameters + ---------- + rollouts: list + list of rollout dictionaries + + Returns + ---------- + separate numpy arrays of observations, actions, rewards, next_observations, and is_terminals + """ + observations = np.concatenate([rollout["observations"] for rollout in rollouts_list]) + actions = np.concatenate([rollout["actions"] for rollout in rollouts_list]) + expert_actions = np.concatenate([rollout["expert_actions"] for rollout in rollouts_list]) + rewards = np.concatenate([rollout["rewards"] for rollout in rollouts_list]) + next_observations = np.concatenate([rollout["next_observations"] for rollout in rollouts_list]) + terminals = np.concatenate([rollout["terminals"] for rollout in rollouts_list]) + + return observations, actions, expert_actions, rewards, next_observations, terminals diff --git a/flow/algorithms/imitation_learning/run.py b/flow/algorithms/imitation_learning/run.py new file mode 100644 index 000000000..ed8717a5a --- /dev/null +++ b/flow/algorithms/imitation_learning/run.py @@ -0,0 +1,126 @@ +""" +Runner file for imitation learning. This script performs imitation learning using DAgger and also configures the trained +model to conduct further training with Reinforcement Learning (see train_with_imitation.py). + +Usage: + python run.py EXP_CONFIG +""" +from flow.algorithms.imitation_learning.trainer import Trainer + + +class Runner(object): + """ Class to run imitation learning (training and evaluation) """ + + def __init__(self, params): + """ + Parameters + __________ + params: dict + dictionary of parameters relevent to running imitation learning. + """ + + # initialize trainer class instance and params + self.params = params + + # import appropriate exp_config module + if self.params['multiagent']: + module = __import__("examples.exp_configs.rl.multiagent", fromlist=[self.params['exp_config']]) + else: + module = __import__("examples.exp_configs.rl.singleagent", fromlist=[self.params['exp_config']]) + + submodule = getattr(module, self.params['exp_config']) + self.trainer = Trainer(params, submodule) + + def run_training_loop(self): + """ + Runs training for imitation learning for number of iterations specified in params. + """ + self.trainer.run_training_loop() + + def evaluate(self): + """ + Evaluates a trained controller over a specified number trajectories; compares average action per step and average reward per trajectory between imitator and expert + """ + self.trainer.evaluate_controller() + + def save_controller_network(self): + """ + Saves the tensorflow keras model of the imitation policy to a h5 file, whose path is specified in params + """ + self.trainer.save_controller_network() + + def save_controller_for_PPO(self): + """ + Creates and saves (in h5 file format) new tensorflow keras model to run PPO with weighs loaded from imitation learning. This model encapsulates both a policy network and a value function network. + """ + self.trainer.save_controller_for_PPO() + + +def main(): + """ + Parse args, run training, and evaluate. + """ + + import argparse + parser = argparse.ArgumentParser() + + # required input parameters + parser.add_argument( + 'exp_config', type=str, + help='Name of the experiment configuration file, as located in ' + 'exp_configs/rl/singleagent or exp_configs/rl/multiagent.') + + + # rollout collection params + parser.add_argument('--ep_len', type=int, default=5000, help='Max length of episodes for rollouts.') + parser.add_argument('--num_agent_train_steps_per_iter', type=int, default=1000, help='Number of gradient steps for training policy.') # number of gradient steps for training policy + parser.add_argument('--n_iter', type=int, default=3, help='Number of DAgger iterations to run (1st iteration is behavioral cloning') + parser.add_argument('--multiagent', type=bool, default=False, help='If true, env is multiagent.') + parser.add_argument('--v_des', type=float, default=15, help='Desired velocity for follower-stopper') + parser.add_argument('--num_eval_episodes', type=int, default=0, help='Number of episodes on which to evaluate imitation model') + + # imitation training params + parser.add_argument('--batch_size', type=int, default=1000, help='Number of environment steps to collect in iteration of DAgger') + parser.add_argument('--init_batch_size', type=int, default=2000, help='Number of environment steps to collect on 1st iteration of DAgger (behavioral cloning iteration)') + parser.add_argument('--vf_batch_size', type=int, default=2000, help='Number of environment steps to collect to learn value function for a policy') + parser.add_argument('--num_vf_iters', type=int, default=100, help='Number of iterations to run vf training') # TODO: better help description for this + parser.add_argument('--train_batch_size', type=int, default=100, help='Batch size for SGD') + parser.add_argument('--stochastic', type=bool, default=False, help='If true, learn a stochastic policy (MV Gaussian)') + parser.add_argument('--variance_regularizer', type=float, default=0.5, help='Regularization hyperparameter to penalize variance in imitation learning loss, for stochastic policies.') + parser.add_argument('--replay_buffer_size', type=int, default=1000000, help='Max size of replay buffer') + parser.add_argument('--lr', type=float, default=0.001, help='Learning rate for imitation learning and value function learning') + + parser.add_argument('--load_imitation_model', type=bool, default=False, help='Whether to load an existin imitation neural net') + parser.add_argument('--load_imitation_path', type=str, default='', help='Path to h5 file from which to load existing imitation neural net') + parser.add_argument('--save_model', type=int, default=0, help='If true, save both imitation model and PPO model in h5 format') + parser.add_argument('--imitation_save_path', type=str, default='', help='Filepath to h5 file in which imitation model should be saved') + parser.add_argument('--PPO_save_path', type=str, default='', help='Filepath to h5 file in which PPO model with copied weights should be saved') + + # misc params + parser.add_argument('--tensorboard_path', type=str, default='/tensorboard/', help='Path to which tensorboard events should be written.') + + args = parser.parse_args() + + # convert args to dictionary + params = vars(args) + + # change this to determine number and size of hidden layers + params["fcnet_hiddens"] = [32, 32, 32] + + + # run training + runner = Runner(params) + runner.run_training_loop() + + # save model after training + if params['save_model'] == 1: + runner.save_controller_network() + runner.save_controller_for_PPO() + + # evaluate controller on difference, compared to expert, in action taken and average reward accumulated per rollout + if params['num_eval_episodes'] > 0: + runner.evaluate() + + +if __name__ == "__main__": + main() diff --git a/flow/algorithms/imitation_learning/train_with_imitation.py b/flow/algorithms/imitation_learning/train_with_imitation.py new file mode 100644 index 000000000..2aae7c2e8 --- /dev/null +++ b/flow/algorithms/imitation_learning/train_with_imitation.py @@ -0,0 +1,171 @@ +from flow.algorithms.imitation_learning.run import * +from examples.train import * + +def parse_args(args): + """Parse training options user can specify in command line. + + Returns + ------- + argparse.Namespace + the output parser object + dict_args + dictionary version of the argparse + """ + + # **** TRAIN.PY ARGS **** + + parser = argparse.ArgumentParser( + formatter_class=argparse.RawDescriptionHelpFormatter, + description="Parse argument used when running a Flow simulation.", + epilog="python train.py EXP_CONFIG EXP_TITLE") + + # required input parameters + parser.add_argument( + 'exp_config', type=str, + help='Name of the experiment configuration file, as located in ' + 'exp_configs/rl/singleagent or exp_configs/rl/multiagent.') + + + parser.add_argument( + 'exp_title', type=str, + help='Title to give the run.') + + + # optional input parameters + parser.add_argument( + '--rl_trainer', type=str, default="rllib", + help='the RL trainer to use. either rllib or Stable-Baselines') + parser.add_argument( + '--algorithm', type=str, default="PPO", + help='RL algorithm to use. Options are PPO, TD3, MATD3 (MADDPG w/ TD3) right now.' + ) + parser.add_argument( + '--num_cpus', type=int, default=1, + help='How many CPUs to use') + parser.add_argument( + '--num_steps', type=int, default=5000, + help='How many total steps to perform learning over. Relevant for stable-baselines') + parser.add_argument( + '--grid_search', action='store_true', default=False, + help='Whether to grid search over hyperparams') + parser.add_argument( + '--num_iterations', type=int, default=200, + help='How many iterations are in a training run.') + parser.add_argument( + '--checkpoint_freq', type=int, default=20, + help='How often to checkpoint.') + parser.add_argument( + '--num_rollouts', type=int, default=1, + help='How many rollouts are in a training batch') + parser.add_argument( + '--rollout_size', type=int, default=1000, + help='How many steps are in a training batch.') + parser.add_argument('--use_s3', action='store_true', help='If true, upload results to s3') + parser.add_argument('--local_mode', action='store_true', default=False, + help='If true only 1 CPU will be used') + parser.add_argument('--render', action='store_true', default=False, + help='If true, we render the display') + parser.add_argument( + '--checkpoint_path', type=str, default=None, + help='Directory with checkpoint to restore training from.') + + + # *** IMITATION LEARNING ARGS *** + + # rollout collection params: + parser.add_argument('--ep_len', type=int, default=5000, help='Max length of episodes for rollouts.') + parser.add_argument('--num_agent_train_steps_per_iter', type=int, default=1000, help='Number of gradient steps for training imitation policy.') + parser.add_argument('--n_iter', type=int, default=3, help='Number of DAgger iterations to run (1st iteration is behavioral cloning') + parser.add_argument('--multiagent', type=bool, default=False, help='If true, env is multiagent.') + parser.add_argument('--v_des', type=float, default=15, help='Desired velocity for follower-stopper') + parser.add_argument('--num_eval_episodes', type=int, default=0, help='Number of episodes on which to evaluate imitation model') + + # imitation training params: + parser.add_argument('--batch_size', type=int, default=1000, help='Number of environment steps to collect in iteration of DAgger') + parser.add_argument('--init_batch_size', type=int, default=2000, help='Number of environment steps to collect on 1st iteration of DAgger (behavioral cloning iteration)') + parser.add_argument('--vf_batch_size', type=int, default=2000, help='Number of environment steps to collect to learn value function for a policy') + parser.add_argument('--num_vf_iters', type=int, default=100, help='Number of iterations to run value function learning, after imitation') + parser.add_argument('--train_batch_size', type=int, default=100, help='Batch size to run SGD on during imitation learning.') + parser.add_argument('--variance_regularizer', type=float, default=0.5, help='Regularization hyperparameter to penalize variance in imitation learning negative log-likelihood loss, for stochastic policies.') + parser.add_argument('--stochastic', type=bool, default=True, help='If true, learn a stochastic policy (MV Gaussian). Must be true to continue with PPO training.') + parser.add_argument('--replay_buffer_size', type=int, default=1000000, help='Max size of replay buffer') + parser.add_argument('--lr', type=float, default=0.001, help='Learning rate for imitation learning and value function learning') + + # loading and saving params: + parser.add_argument('--load_imitation_model', type=bool, default=False, help='Whether to load an existing imitation neural network.') + parser.add_argument('--load_imitation_path', type=str, default='', help='Path to h5 file from which to load existing imitation neural net. load_imitation_model must be True') + parser.add_argument('--imitation_save_path', type=str, default='', help='Filepath to h5 file in which imitation model should be saved') + parser.add_argument('--PPO_save_path', type=str, default='', help="Filepath to h5 file in which the ppo model should be saved. Before starting PPO training, weights (for both policy and value function) will be loaded from this model") + + # misc + parser.add_argument('--tensorboard_path', type=str, default='/tensorboard/', help='Path to which tensorboard events should be written.') + + parsed_args = parser.parse_known_args(args)[0] + dict_args = vars(parsed_args) + + return parsed_args, dict_args + + + +def main(args): + + # Parse args, train imitation learning + + flags, params = parse_args(args) + + # depth and size of MLP layers + params["fcnet_hiddens"] = [32, 32, 32] + + # load_weights_path for PPO must be set to same path as PPO_save_path (a result from imitation) + params['load_weights_path'] = params["PPO_save_path"] + + + print("\n\n********** IMITATION LEARNING ************ \n") + # run training + imitation_runner = Runner(params) + imitation_runner.run_training_loop() + + # save imitation network + imitation_runner.save_controller_network() + + # save PPO network (contains policy and value function) + imitation_runner.save_controller_for_PPO() + + # Imitation Done, start RL + print("\n\n********** RL ************ \n") + + # Import relevant information from the exp_config script. + module = __import__( + "examples.exp_configs.rl.singleagent", fromlist=[flags.exp_config]) + module_ma = __import__( + "examples.exp_configs.rl.multiagent", fromlist=[flags.exp_config]) + + # Import the sub-module containing the specified exp_config and determine + # whether the environment is single agent or multi-agent. + if hasattr(module, flags.exp_config): + submodule = getattr(module, flags.exp_config) + multiagent = False + elif hasattr(module_ma, flags.exp_config): + submodule = getattr(module_ma, flags.exp_config) + assert flags.rl_trainer.lower() in ["rllib", "h-baselines"], \ + "Currently, multiagent experiments are only supported through "\ + "RLlib. Try running this experiment using RLlib: " \ + "'python train.py EXP_CONFIG'" + multiagent = True + else: + raise ValueError("Unable to find experiment config.") + + # Perform the training operation. + if flags.rl_trainer.lower() == "rllib": + train_rllib(submodule, flags) + elif flags.rl_trainer.lower() == "stable-baselines": + train_stable_baselines(submodule, flags) + elif flags.rl_trainer.lower() == "h-baselines": + flow_params = submodule.flow_params + train_h_baselines(flow_params, args, multiagent) + else: + raise ValueError("rl_trainer should be either 'rllib', 'h-baselines', " + "or 'stable-baselines'.") + +if __name__ == "__main__": + main(sys.argv[1:]) diff --git a/flow/algorithms/imitation_learning/trainer.py b/flow/algorithms/imitation_learning/trainer.py new file mode 100644 index 000000000..203eee0b1 --- /dev/null +++ b/flow/algorithms/imitation_learning/trainer.py @@ -0,0 +1,320 @@ +from flow.algorithms.imitation_learning.utils import sample_n_trajectories, sample_trajectories +from flow.utils.registry import make_create_env +from flow.algorithms.imitation_learning.imitating_controller import ImitatingController +from flow.algorithms.imitation_learning.imitating_network import ImitatingNetwork +from flow.algorithms.imitation_learning.utils_tensorflow import * +from flow.algorithms.imitation_learning.keras_utils import * +from flow.controllers.velocity_controllers import FollowerStopper +from flow.core.params import SumoCarFollowingParams + +class Trainer(object): + """ + Class to initialize and run training for imitation learning (with DAgger) + """ + + def __init__(self, params, submodule): + """ + Parameters + __________ + params: dict + Dictionary of parameters used to run imitation learning + submodule: Module + Python module for file containing flow_params + """ + + # get flow params + self.flow_params = submodule.flow_params + + # setup parameters for training + self.params = params + self.sess = create_tf_session() + + # environment setup + create_env, _ = make_create_env(self.flow_params) + self.env = create_env() + + # vehicle setup + self.multiagent = self.params['multiagent'] # multiagent or singleagent env + + if not self.multiagent and self.env.action_space.shape[0] > 1: + # use sorted rl ids if the method exists (e.g.. singlagent straightroad) + try: + self.vehicle_ids = self.env.get_sorted_rl_ids() + except: + self.vehicle_ids = self.k.vehicle.get_rl_ids() + else: + # use get_rl_ids if sorted_rl_ids doesn't exist + self.vehicle_ids = self.env.k.vehicle.get_rl_ids() + + # neural net setup + obs_dim = self.env.observation_space.shape[0] + action_dim = self.env.action_space.shape[0] + + self.params['action_dim'] = action_dim + self.params['obs_dim'] = obs_dim + + # initialize neural network class and tf variables + self.action_network = ImitatingNetwork(self.sess, self.params['action_dim'], self.params['obs_dim'], self.params['fcnet_hiddens'], self.params['replay_buffer_size'], self.params['lr'], stochastic=self.params['stochastic'], variance_regularizer=self.params['variance_regularizer'], load_model=self.params['load_imitation_model'], load_path=self.params['load_imitation_path'], tensorboard_path=self.params['tensorboard_path']) + + + # controllers setup + v_des = self.params['v_des'] # for FollowerStopper + car_following_params = SumoCarFollowingParams() + self.controllers = dict() + # initialize controllers: save in a dictionary to avoid re-initializing a controller for a vehicle + for vehicle_id in self.vehicle_ids: + expert = FollowerStopper(vehicle_id, car_following_params=car_following_params, v_des=v_des) + imitator = ImitatingController(vehicle_id, self.action_network, self.multiagent, car_following_params=car_following_params) + self.controllers[vehicle_id] = (imitator, expert) + + + def run_training_loop(self): + """ + Trains imitator for self.params['n_iter'] iterations (each iteration collects new trajectories to put in replay buffer) + """ + + # number of imitation learning iterations (1st iteration is behavioral cloning + n_iter = self.params['n_iter'] + # init vars at beginning of training + # number of environment steps taken throughout training + + self.total_envsteps = 0 + + for itr in range(n_iter): + print("\n\n********** Iteration %i ************"%itr) + + # collect trajectories, to be used for training + if itr == 0: + # first iteration is behavioral cloning + training_returns = self.collect_training_trajectories(itr, self.params['init_batch_size']) + else: + # other iterations use DAgger (trajectories collected by running imitator policy) + training_returns = self.collect_training_trajectories(itr, self.params['batch_size']) + + paths, envsteps_this_batch = training_returns + self.total_envsteps += envsteps_this_batch + + # add collected data to replay buffer in neural network class + self.action_network.add_to_replay_buffer(paths) + + # train controller + self.train_controller() + + def collect_training_trajectories(self, itr, batch_size): + """ + Collect (state, action, reward, next_state, terminal) tuples for training + + Parameters + __________ + itr: int + iteration of training during which function is called. Used to determine whether to run behavioral cloning or DAgger + batch_size: int + number of tuples to collect + Returns + _______ + paths: list + list of trajectories + envsteps_this_batch: int + the sum over the numbers of environment steps in paths (total number of env transitions in trajectories collected) + """ + + print("\nCollecting data to be used for training...") + max_decel = self.flow_params['env'].additional_params['max_decel'] + trajectories, envsteps_this_batch = sample_trajectories(self.env, self.controllers, self.action_network, batch_size, self.params['ep_len'], self.multiagent, use_expert=itr==0, v_des=self.params['v_des'], max_decel=max_decel) + + return trajectories, envsteps_this_batch + + def train_controller(self): + """ + Trains controller for specified number of steps, using data sampled from replay buffer; each step involves running optimizer (i.e. Adam) once + """ + + print("Training controller using sampled data from replay buffer...") + for train_step in range(self.params['num_agent_train_steps_per_iter']): + # sample data from replay buffer + ob_batch, ac_batch, expert_ac_batch = self.action_network.sample_data(self.params['train_batch_size']) + # train network on sampled data + self.action_network.train(ob_batch, expert_ac_batch) + + def evaluate_controller(self): + """ + Evaluates a trained imitation controller on similarity with expert with respect to action taken and total reward per rollout. + """ + + print("\n\n********** Evaluation ************ \n") + + + # number of trajectories to evaluate performance on + num_trajs = self.params['num_eval_episodes'] + + # collect imitator driven trajectories (along with corresponding expert actions) + trajectories = sample_n_trajectories(self.env, self.controllers, self.action_network, num_trajs, self.params['ep_len'], self.multiagent, False, v_des=self.params['v_des']) + + # initialize metrics + total_imitator_steps = 0 # total number of environment steps taken across the n trajectories + average_imitator_reward_per_rollout = 0 # average reward per rollout achieved by imitator + + action_errors = np.array([]) # difference in action (acceleration) taken between expert and imitator + average_action_expert = 0 # average action taken, across all timesteps, by expert (used to compute % average) + average_action_imitator = 0 # average action taken, across all timesteps, by imitator (used to compute % average) + + # compare actions taken in each step of trajectories (trajectories are controlled by imitator) + for traj_tuple in trajectories: + traj = traj_tuple[0] + traj_len = traj_tuple[1] + + imitator_actions = traj['actions'] + expert_actions = traj['expert_actions'] + + average_action_expert += np.sum(expert_actions) + average_action_imitator += np.sum(imitator_actions) + + # use RMSE as action error metric + action_error = (np.linalg.norm(imitator_actions - expert_actions)) / len(imitator_actions) + action_errors = np.append(action_errors, action_error) + + total_imitator_steps += traj_len + average_imitator_reward_per_rollout += np.sum(traj['rewards']) + + # compute averages for metrics + average_imitator_reward_per_rollout = average_imitator_reward_per_rollout / len(trajectories) + + average_action_expert = average_action_expert / total_imitator_steps + + # collect expert driven trajectories (these trajectories are only used to compare average reward per rollout) + expert_trajectories = sample_n_trajectories(self.env, self.controllers, self.action_network, num_trajs, self.params['ep_len'], self.multiagent, True, v_des=self.params['v_des']) + + # initialize metrics + total_expert_steps = 0 + average_expert_reward_per_rollout = 0 + + # compare reward accumulated in trajectories collected via expert vs. via imitator + for traj_tuple in expert_trajectories: + traj = traj_tuple[0] + traj_len = traj_tuple[1] + total_expert_steps += traj_len + average_expert_reward_per_rollout += np.sum(traj['rewards']) + + average_expert_reward_per_rollout = average_expert_reward_per_rollout / len(expert_trajectories) + + # compute percent errors (using expert values as 'ground truth') + percent_error_average_reward = (np.abs(average_expert_reward_per_rollout - average_imitator_reward_per_rollout) / average_expert_reward_per_rollout) * 100 + + percent_error_average_action = (np.abs(np.mean(action_errors)) / np.abs(average_action_expert)) * 100 + + # Print results + print("\nAverage reward per rollout, expert: ", average_expert_reward_per_rollout) + print("Average reward per rollout, imitator: ", average_imitator_reward_per_rollout) + print("% Difference, average reward per rollout: ", percent_error_average_reward, "\n") + + + print(" Average RMSE action error per rollout: ", np.mean(action_errors)) + print("Average Action Taken by Expert: ", average_action_expert) + print("% Action Error: ", percent_error_average_action, "\n") + print("Total imitator steps: ", total_imitator_steps) + print("Total expert steps: ", total_expert_steps) + + def learn_value_function(self, num_samples, num_iterations, num_grad_steps): + """ + Learn the value function under imitation policy. + Parameters + __________ + num_samples: number of environment transition samples to collect to learn from + num_iterations: number of iterations to relabel data, and train + num_grad_steps: number of gradient steps per training iteration + + Returns + _______ + Value function neural net + """ + + print("\n\n********** Learning value function of imitation policy ************ \n") + # init value function neural net + vf_net = build_neural_net_deterministic(self.params['obs_dim'], 1, self.params['fcnet_hiddens']) + vf_net.compile(loss='mean_squared_error', optimizer = tf.keras.optimizers.Adam(learning_rate=self.params['lr'])) + + max_decel = self.flow_params['env'].additional_params['max_decel'] + # collect trajectory samples to train on + trajectories, envsteps_this_batch = sample_trajectories(self.env, self.controllers, self.action_network, + num_samples, self.params['ep_len'], self.multiagent, + use_expert=False, v_des=self.params['v_des'], + max_decel=max_decel) + + # combine trajectories into one + observations = np.concatenate([traj['observations'] for traj in trajectories]) + rewards = np.concatenate([traj['rewards'] for traj in trajectories]) + next_observations = np.concatenate([traj['next_observations'] for traj in trajectories]) + + # iterate over data multiple times (labels change every iteration) + for i in range(num_iterations): + # form labels + next_state_value_preds = vf_net.predict(next_observations).flatten() + next_state_value_preds[np.isnan(next_state_value_preds)] = 0 + labels = rewards + next_state_value_preds + vf_net.fit(observations, labels, verbose=0) + + return vf_net + + + + def save_controller_for_PPO(self): + """ + Build a model, with same policy architecture as imitation network, to run PPO, copy weights from imitation, and save this model. + + """ + + # filepath to h5 file in which keras model will be saved + PPO_save_path = self.params['PPO_save_path'] + + vf_net = self.learn_value_function(self.params['vf_batch_size'], self.params['num_vf_iters'], self.params['num_agent_train_steps_per_iter']) + + input = tf.keras.layers.Input(self.action_network.model.input.shape[1].value) + curr_layer = input + + # number of hidden layers + num_layers = len(self.action_network.model.layers) - 2 + + # build layers for policy + for i in range(num_layers): + size = self.action_network.model.layers[i + 1].output.shape[1].value + activation = tf.keras.activations.serialize(self.action_network.model.layers[i + 1].activation) + curr_layer = tf.keras.layers.Dense(size, activation=activation, name="policy_hidden_layer_{}".format(i + 1))(curr_layer) + output_layer_policy = tf.keras.layers.Dense(self.action_network.model.output.shape[1].value, activation=None, name="policy_output_layer")(curr_layer) + + # build layers for value function + curr_layer = input + for i in range(num_layers): + size = self.params['fcnet_hiddens'][i] + curr_layer = tf.keras.layers.Dense(size, activation="tanh", name="vf_hidden_layer_{}".format(i+1))(curr_layer) + output_layer_vf = tf.keras.layers.Dense(1, activation=None, name="vf_output_layer")(curr_layer) + + ppo_model = tf.keras.Model(inputs=input, outputs=[output_layer_policy, output_layer_vf], name="ppo_model") + + # set the policy weights to those learned from imitation + for i in range(num_layers): + policy_layer = ppo_model.get_layer(name="policy_hidden_layer_{}".format(i + 1)) + policy_layer.set_weights(self.action_network.model.layers[i + 1].get_weights()) + policy_output = ppo_model.get_layer("policy_output_layer") + policy_output.set_weights(self.action_network.model.layers[-1].get_weights()) + + # set value function weights to those learned + num_vf_layers = len(vf_net.layers) - 2 + for i in range(num_vf_layers): + vf_layer = ppo_model.get_layer('vf_hidden_layer_{}'.format(i + 1)) + vf_layer.set_weights(vf_net.layers[i + 1].get_weights()) + vf_output = ppo_model.get_layer("vf_output_layer") + vf_output.set_weights(vf_net.layers[-1].get_weights()) + + + # save the model (as a h5 file) + ppo_model.save(PPO_save_path) + + + def save_controller_network(self): + """ + Saves a keras tensorflow model to the specified path given in the command line params. Path must end with .h5. + """ + + imitation_save_path = self.params['imitation_save_path'] + print("Saving tensorflow model to: ", imitation_save_path) + self.action_network.save_network(imitation_save_path) diff --git a/flow/algorithms/imitation_learning/utils.py b/flow/algorithms/imitation_learning/utils.py new file mode 100644 index 000000000..cb75ccc19 --- /dev/null +++ b/flow/algorithms/imitation_learning/utils.py @@ -0,0 +1,369 @@ +import tensorflow as tf +import os +import numpy as np +import math +from flow.core.params import SumoCarFollowingParams +from flow.algorithms.imitation_learning.imitating_controller import ImitatingController +from flow.algorithms.imitation_learning.imitating_network import ImitatingNetwork +from flow.controllers.car_following_models import IDMController +from flow.controllers.velocity_controllers import FollowerStopper +from flow.core.rewards import * + +""" Class agnostic helper functions """ + +def sample_trajectory_singleagent(env, controllers, action_network, max_trajectory_length, use_expert, v_des, max_decel): + """ + Samples a single trajectory from a singleagent environment. + Parameters + __________ + env: gym.Env + environment + controllers: dict + Dictionary of 2-tuples (Imitating_Controller, Expert_Controller), with keys of vehicle_ids + action_network: ImitatingNetwork + ImitatingNetwork class containing neural net for action prediction + max_trajectory_length: int + maximum steps in a trajectory + use_expert: bool + if True, trajectory is collected using expert policy (for behavioral cloning) + v_des: float + v_des parameter for follower-stopper + max_decel: float + maximum deceleration of environment. Used to determine dummy values to put as labels when environment has less vehicles than the maximum amount. + Returns + _______ + dict + Dictionary of numpy arrays, where matching indeces of each array given (state, action, expert_action, reward, next_state, terminal) tuples + """ + + # reset and initialize arrays to store trajectory + observation = env.reset() + + observations, actions, expert_actions, rewards, next_observations, terminals = [], [], [], [], [], [] + traj_length = 0 + + while True: + + # update vehicle ids: if multidimensional action space, check if env has a sorted_rl_ids method + if env.action_space.shape[0] > 1: + try: + vehicle_ids = env.get_sorted_rl_ids() + except: + vehicle_ids = env.k.vehicle.get_rl_ids() + else: + vehicle_ids = env.k.vehicle.get_rl_ids() + + # no RL actions if no RL vehicles + if len(vehicle_ids) == 0: + observation, reward, done, _ = env.step(None) + if done: + break + continue + + # init controllers if any of vehicle ids are new + # there could be multiple vehicle ids if they all share one state but have different actions + car_following_params = SumoCarFollowingParams() + + for vehicle_id in vehicle_ids: + if vehicle_id not in set(controllers.keys()): + expert = FollowerStopper(vehicle_id, car_following_params=car_following_params, v_des=v_des) + imitator = ImitatingController(vehicle_id, action_network, False, car_following_params=car_following_params) + controllers[vehicle_id] = (imitator, expert) + + + # get the actions given by controllers + action_dim = env.action_space.shape[0] + rl_actions = [] + actions_expert = [] + + invalid_expert_action = False + for i in range(action_dim): + # if max number of RL vehicles is not reached, insert dummy values + if i >= len(vehicle_ids): + # dummy value is -2 * max_decel + ignore_accel = -2 * max_decel + rl_actions.append(ignore_accel) + actions_expert.append(ignore_accel) + else: + imitator = controllers[vehicle_ids[i]][0] + expert = controllers[vehicle_ids[i]][1] + + expert_action = expert.get_action(env) + # catch invalid expert actions + if (expert_action is None or math.isnan(expert_action)): + invalid_expert_action = True + + actions_expert.append(expert_action) + + if use_expert: + if traj_length == 0 and i == 0: + print("Controller collecting trajectory: ", type(expert)) + rl_actions.append(expert_action) + else: + if traj_length == 0 and i == 0: + print("Controller collecting trajectory: ", type(imitator)) + imitator_action = imitator.get_action(env) + rl_actions.append(imitator_action) + + + # invalid action in rl_actions; default to Sumo, ignore sample + if None in rl_actions or np.nan in rl_actions: + observation, reward, done, _ = env.step(None) + terminate_rollout = traj_length == max_trajectory_length or done + if terminate_rollout: + break + continue + # invalid expert action (if rl_actions is expert actions then this would have been caught above)) + if not use_expert and invalid_expert_action: + # throw away sample, but step according to rl_actions + observation, reward, done, _ = env.step(rl_actions) + terminate_rollout = traj_length == max_trajectory_length or done + if terminate_rollout: + break + continue + + # update collected data + observations.append(observation) + actions.append(rl_actions) + expert_actions.append(actions_expert) + observation, reward, done, _ = env.step(rl_actions) + + traj_length += 1 + next_observations.append(observation) + rewards.append(reward) + terminate_rollout = (traj_length == max_trajectory_length) or done + terminals.append(terminate_rollout) + + if terminate_rollout: + break + + return traj_dict(observations, actions, expert_actions, rewards, next_observations, terminals), traj_length + + +def sample_trajectory_multiagent(env, controllers, action_network, max_trajectory_length, use_expert, v_des): + """ + Samples a single trajectory from a multiagent environment. + + Parameters + __________ + env: gym.Env + environment + controllers: dict + Dictionary of 2-tuples (Imitating_Controller, Expert_Controller), with keys of vehicle_ids + action_network: ImitatingNetwork + ImitatingNetwork class containing neural net for action prediction + max_trajectory_length: int + maximum steps in a trajectory + use_expert: bool + if True, trajectory is collected using expert policy (for behavioral cloning) + v_des: float + v_des parameter for follower-stopper + Returns + _______ + dict + Dictionary of numpy arrays, where matching indeces of each array given (state, action, expert_action, reward, next_state, terminal) tuples + """ + + observation_dict = env.reset() + + observations, actions, expert_actions, rewards, next_observations, terminals = [], [], [], [], [], [] + traj_length = 0 + + while True: + + vehicle_ids = list(observation_dict.keys()) + # add nothing to replay buffer if no vehicles + if len(vehicle_ids) == 0: + observation_dict, reward, done, _ = env.step(None) + if done['__all__']: + break + continue + + # actions taken by collecting controller + rl_actions = dict() + invalid_expert_action = False + # actions taken by expert + expert_action_dict= dict() + + for i in range(len(vehicle_ids)): + vehicle_id = vehicle_ids[i] + + if vehicle_id not in set(controllers.keys()): + car_following_params = SumoCarFollowingParams() + + expert = FollowerStopper(vehicle_id, car_following_params=car_following_params, v_des=v_des) + imitator = ImitatingController(vehicle_id, action_network, True, car_following_params=car_following_params) + controllers[vehicle_id] = (imitator, expert) + + expert_controller = controllers[vehicle_id][1] + if use_expert: + controller = expert_controller + else: + controller = controllers[vehicle_id][0] + + if traj_length == 0 and i == 0: + print("Controller collecting trajectory: ", controller) + + action = controller.get_action(env) + + + # action should be a scalar acceleration + if type(action) == np.ndarray: + action = action.flatten()[0] + + expert_action = expert_controller.get_action(env) + expert_action_dict[vehicle_id] = expert_action + + if (expert_action is None or math.isnan(expert_action)): + invalid_expert_action = True + + rl_actions[vehicle_id] = action + + if invalid_expert_action: + # invalid action in rl_actions, so default control to SUMO + observation_dict, reward_dict, done_dict, _ = env.step(None) + terminate_rollout = traj_length == max_trajectory_length or done_dict['__all__'] + if terminate_rollout: + break + continue + + for vehicle_id in vehicle_ids: + observations.append(observation_dict[vehicle_id]) + actions.append(rl_actions[vehicle_id]) + expert_actions.append(expert_action_dict[vehicle_id]) + + observation_dict, reward_dict, done_dict, _ = env.step(rl_actions) + terminate_rollout = done_dict['__all__'] or (traj_length == max_trajectory_length) + + for vehicle_id in vehicle_ids: + # default next observation to nans + next_observations.append(observation_dict.get(vehicle_id, np.empty((env.observation_space.shape[0], )))) + rewards.append(reward_dict.get(vehicle_id, 0)) + terminals.append(terminate_rollout) + + traj_length += 1 + + if terminate_rollout: + break + + return traj_dict(observations, actions, expert_actions, rewards, next_observations, terminals), traj_length + + +def sample_trajectories(env, controllers, action_network, min_batch_timesteps, max_trajectory_length, multiagent, use_expert, v_des=15, max_decel=4.5): + """ + Samples trajectories from environment. + + Parameters + __________ + env: gym.Env + environment + controllers: dict + Dictionary of 2-tuples (Imitating_Controller, Expert_Controller), with keys of vehicle_ids + action_network: ImitatingNetwork + ImitatingNetwork class containing neural net for action prediction + min_batch_timesteps: int + minimum number of env transitions to collect + max_trajectory_length: int + maximum steps in a trajectory + multiagent: bool + if True, env is a multiagent env + use_expert: bool + if True, trajectory is collected using expert policy (for behavioral cloning) + v_des: float + v_des parameter for follower-stopper + max_decel: float + maximum deceleration of environment. Used to determine dummy values to put as labels when environment has less vehicles than the maximum amount. + + Returns + _______ + dict, int + Dictionary of trajectory numpy arrays, where matching indeces of each array given (state, action, expert_action, reward, next_state, terminal) tuples + Total number of env transitions seen over trajectories + """ + total_envsteps = 0 + trajectories = [] + + while total_envsteps < min_batch_timesteps: + + if multiagent: + trajectory, traj_length = sample_trajectory_multiagent(env, controllers, action_network, max_trajectory_length, use_expert, v_des) + else: + trajectory, traj_length = sample_trajectory_singleagent(env, controllers, action_network, max_trajectory_length, use_expert, v_des, max_decel) + + trajectories.append(trajectory) + + total_envsteps += traj_length + + return trajectories, total_envsteps + +def sample_n_trajectories(env, controllers, action_network, n, max_trajectory_length, multiagent, use_expert, v_des=15, max_decel=4.5): + """ + Samples n trajectories from environment. + + Parameters + __________ + env: gym.Env + environment + controllers: dict + Dictionary of 2-tuples (Imitating_Controller, Expert_Controller), with keys of vehicle_ids + action_network: ImitatingNetwork + ImitatingNetwork class containing neural net for action prediction + n: int + number of trajectories to collect + max_trajectory_length: int + maximum steps in a trajectory + multiagent: bool + if True, env is a multiagent env + use_expert: bool + if True, trajectory is collected using expert policy (for behavioral cloning) + v_des: float + v_des parameter for follower-stopper + max_decel: float + maximum deceleration of environment. Used to determine dummy values to put as labels when environment has less vehicles than the maximum amount. + + Returns + _______ + dict + Dictionary of trajectory numpy arrays, where matching indeces of each array given (state, action, expert_action, reward, next_state, terminal) tuples + """ + + trajectories = [] + for _ in range(n): + + if multiagent: + trajectory, length = sample_trajectory_multiagent(env, controllers, action_network, max_trajectory_length, use_expert, v_des) + else: + trajectory, length = sample_trajectory_singleagent(env, controllers, action_network, max_trajectory_length, use_expert, v_des, max_decel) + + trajectories.append((trajectory, length)) + + return trajectories + + +def traj_dict(observations, actions, expert_actions, rewards, next_observations, terminals): + """ + Collects observation, action, expert_action, rewards, next observation, terminal lists (collected over a rollout) into a single rollout dictionary. + Parameters + __________ + observations: list + list of observations; ith entry is ith observation + actions: list + list of actions; ith entry is action taken at ith timestep + rewards: list + list of rewards; ith entry is reward received at ith timestep + next_observations: list + list of next observations; ith entry is the observation transitioned to due to state and action at ith timestep + terminals: list + list of booleans indicating if rollout ended at that timestep + + Returns + _______ + dict + dictionary containing above lists in numpy array form. + """ + return {"observations" : np.array(observations), + "actions" : np.array(actions), + "expert_actions": np.array(expert_actions), + "rewards" : np.array(rewards), + "next_observations": np.array(next_observations), + "terminals": np.array(terminals)} diff --git a/flow/algorithms/imitation_learning/utils_tensorflow.py b/flow/algorithms/imitation_learning/utils_tensorflow.py new file mode 100644 index 000000000..cbbfa633d --- /dev/null +++ b/flow/algorithms/imitation_learning/utils_tensorflow.py @@ -0,0 +1,49 @@ +import numpy as np +import tensorflow as tf + + +""" Class agnostic helper functions related to tensorflow""" + +def build_neural_net(input_placeholder, output_size, scope, n_layers, size, activation=tf.tanh, output_activation=None): + """ + Builds a feedfoward neural network for action prediction + Parameters + __________ + input_placeholder: tensor + placeholder variable for the state (batch_size, input_size) + scope: str + variable scope of the network + n_layers: int + number of hidden layers + size: int + dimension of each hidden layer + activation: str + activation function of each hidden layer + output_size: int + size of the output layer + output_activation: str + activation function of the output layer + + Returns + _______ + output_placeholder: tensor + the result of pass through Neural Network + """ + output_placeholder = input_placeholder + with tf.variable_scope(scope, reuse=tf.AUTO_REUSE): + for _ in range(n_layers): + output_placeholder = tf.layers.dense(output_placeholder, size, activation=activation) + output_placeholder = tf.layers.dense(output_placeholder, output_size, activation=output_activation,name='Output_Layer') + return output_placeholder + +def create_tf_session(): + """ + Creates a tf session + Returns + _______ + tf.Session + new tensorflow session + """ + config = tf.compat.v1.ConfigProto(device_count={'GPU': 0}) + sess = tf.compat.v1.Session(config=config) + return sess diff --git a/flow/visualize/visualizer_rllib.py b/flow/visualize/visualizer_rllib.py index ec96e3306..e1f09fafd 100644 --- a/flow/visualize/visualizer_rllib.py +++ b/flow/visualize/visualizer_rllib.py @@ -96,6 +96,14 @@ def visualizer_rllib(args): sys.exit(1) if args.run: agent_cls = get_agent_class(args.run) + elif config['env_config']['run'] == "": + from flow.controllers.imitation_learning.imitation_trainer import Imitation_PPO_Trainable + from flow.controllers.imitation_learning.ppo_model import PPONetwork + from ray.rllib.models import ModelCatalog + agent_cls = get_agent_class("PPO") + ModelCatalog.register_custom_model("imitation_ppo_trainable", Imitation_PPO_Trainable) + ModelCatalog.register_custom_model("PPO_loaded_weights", PPONetwork) + elif config['env_config']['run'] == "": from flow.algorithms.centralized_PPO import CCTrainer, CentralizedCriticModel from ray.rllib.models import ModelCatalog @@ -168,6 +176,7 @@ def visualizer_rllib(args): checkpoint = checkpoint + '/checkpoint-' + args.checkpoint_num agent.restore(checkpoint) + if hasattr(agent, "local_evaluator") and \ os.environ.get("TEST_FLAG") != 'True': env = agent.local_evaluator.env @@ -453,5 +462,6 @@ def create_parser(): if __name__ == '__main__': parser = create_parser() args = parser.parse_args() - ray.init(num_cpus=1) + print("GEN EMISSION: ", args.gen_emission) + ray.init(local_mode=True) visualizer_rllib(args) diff --git a/tests/fast_tests/test_environment_base_class.py b/tests/fast_tests/test_environment_base_class.py index ee815393c..b5c6cbc17 100644 --- a/tests/fast_tests/test_environment_base_class.py +++ b/tests/fast_tests/test_environment_base_class.py @@ -13,9 +13,8 @@ from tests.setup_scripts import ring_road_exp_setup, highway_exp_setup import os -import gym.spaces as spaces -from gym.spaces.box import Box import numpy as np +import gym.spaces as spaces os.environ["TEST_FLAG"] = "True" @@ -26,41 +25,6 @@ YELLOW = (255, 255, 0) -class TestFailRLActionsEnv(Env): - """Test environment designed to fail _apply_rl_actions not-implemented test.""" - - @property - def action_space(self): - """See parent class.""" - return Box(low=0, high=0, shape=(0,), dtype=np.float32) # pragma: no cover - - @property - def observation_space(self): - """See parent class.""" - return Box(low=0, high=0, shape=(0,), dtype=np.float32) # pragma: no cover - - def get_state(self, **kwargs): - """See class definition.""" - return np.array([]) # pragma: no cover - - -class TestFailGetStateEnv(Env): - """Test environment designed to fail get_state not-implemented test.""" - - @property - def action_space(self): - """See parent class.""" - return Box(low=0, high=0, shape=(0,), dtype=np.float32) # pragma: no cover - - @property - def observation_space(self): - """See parent class.""" - return Box(low=0, high=0, shape=(0,), dtype=np.float32) # pragma: no cover - - def _apply_rl_actions(self, rl_actions): - return # pragma: no cover - - class TestShuffle(unittest.TestCase): """ Tests that, at resets, the ordering of vehicles changes while the starting @@ -347,34 +311,28 @@ class TestAbstractMethods(unittest.TestCase): """ def setUp(self): - self.env, self.network, _ = ring_road_exp_setup() - self.sim_params = SumoParams() # FIXME: make ambiguous - self.env_params = EnvParams() + env, network, _ = ring_road_exp_setup() + sim_params = SumoParams() # FIXME: make ambiguous + env_params = EnvParams() + self.env = Env(sim_params=sim_params, + env_params=env_params, + network=network) - def test_abstract_base_class(self): - """Checks that instantiating abstract base class raises an error.""" - with self.assertRaises(TypeError): - Env(sim_params=self.sim_params, - env_params=self.env_params, - network=self.network) + def tearDown(self): + self.env.terminate() + self.env = None def test_get_state(self): - """Checks that instantiating without get_state implemented - raises an error. - """ - with self.assertRaises(TypeError): - TestFailGetStateEnv(sim_params=self.sim_params, - env_params=self.env_params, - network=self.network) + """Checks that get_state raises an error.""" + self.assertRaises(NotImplementedError, self.env.get_state) + + def test_compute_reward(self): + """Checks that compute_reward returns 0.""" + self.assertEqual(self.env.compute_reward([]), 0) def test__apply_rl_actions(self): - """Checks that instantiating without _apply_rl_actions - implemented raises an error. - """ - with self.assertRaises(TypeError): - TestFailRLActionsEnv(sim_params=self.sim_params, - env_params=self.env_params, - network=self.network) + self.assertRaises(NotImplementedError, self.env._apply_rl_actions, + rl_actions=None) class TestVehicleColoring(unittest.TestCase): diff --git a/tests/fast_tests/test_examples.py b/tests/fast_tests/test_examples.py index fbd78294d..85548fc51 100644 --- a/tests/fast_tests/test_examples.py +++ b/tests/fast_tests/test_examples.py @@ -26,7 +26,6 @@ flow_params as multiagent_traffic_light_grid from examples.exp_configs.rl.multiagent.multiagent_highway import flow_params as multiagent_highway -from examples.simulate import parse_args as parse_simulate_args from examples.train import parse_args as parse_train_args from examples.train import run_model_stablebaseline as run_stable_baselines_model from examples.train import setup_exps_rllib as setup_rllib_exps diff --git a/tests/fast_tests/test_files/i210_emission.csv b/tests/fast_tests/test_files/i210_emission.csv index ec63cf9cf..d43c115a4 100644 --- a/tests/fast_tests/test_files/i210_emission.csv +++ b/tests/fast_tests/test_files/i210_emission.csv @@ -1,4 +1,4 @@ -x,time,edge_id,eclass,type,PMx,speed,angle,CO,CO2,electricity,noise,lane_number,NOx,distance,route,y,id,fuel,HC,waiting +x,time,edge_id,eclass,type,PMx,speed,angle,CO,CO2,electricity,noise,lane_number,NOx,relative_position,route,y,id,fuel,HC,waiting 485.04,0.8,119257914,HBEFA3/PC_G_EU4,human,0.05,23.0,119.74,3.32,3793.12,0.0,70.29,1,1.17,5.1,route119257914_0,1068.18,flow_00.0,1.63,0.11,0.0 500.91,1.6,119257914,HBEFA3/PC_G_EU4,human,0.0,22.84,119.74,0.0,0.0,0.0,69.9,1,0.0,23.37,route119257914_0,1059.12,flow_00.0,0.0,0.0,0.0 517.1,2.4,119257914,HBEFA3/PC_G_EU4,human,0.15,23.31,119.74,78.83,7435.5,0.0,71.61,1,2.88,42.02,route119257914_0,1049.87,flow_00.0,3.2,0.54,0.0 diff --git a/tests/fast_tests/test_scenarios.py b/tests/fast_tests/test_scenarios.py index 5fccdcb3b..2263f3474 100644 --- a/tests/fast_tests/test_scenarios.py +++ b/tests/fast_tests/test_scenarios.py @@ -5,11 +5,8 @@ from flow.networks import BottleneckNetwork, FigureEightNetwork, \ TrafficLightGridNetwork, HighwayNetwork, RingNetwork, MergeNetwork, \ MiniCityNetwork, MultiRingNetwork -from flow.networks import I210SubNetwork from tests.setup_scripts import highway_exp_setup -import flow.config as config - __all__ = [ "MultiRingNetwork", "MiniCityNetwork" ] @@ -136,7 +133,7 @@ def test_ghost_edge(self): self.assertEqual(env.k.network.speed_limit("highway_0"), 30) # =================================================================== # - # With a ghost edge (300m, 25m/s) # + # With a ghost edge # # =================================================================== # # create the network @@ -147,37 +144,7 @@ def test_ghost_edge(self): "speed_limit": 30, "num_edges": 1, "use_ghost_edge": True, - "ghost_speed_limit": 25, - "boundary_cell_length": 300, - }) - ) - env.reset() - - # check the network length - self.assertEqual(env.k.network.length(), 1300.1) - - # check the edge list - self.assertEqual(env.k.network.get_edge_list(), - ["highway_0", "highway_end"]) - - # check the speed limits of the edges - self.assertEqual(env.k.network.speed_limit("highway_0"), 30) - self.assertEqual(env.k.network.speed_limit("highway_end"), 25) - - # =================================================================== # - # With a ghost edge (500m, 10m/s) # - # =================================================================== # - - # create the network - env, _, _ = highway_exp_setup( - net_params=NetParams(additional_params={ - "length": 1000, - "lanes": 4, - "speed_limit": 30, - "num_edges": 1, - "use_ghost_edge": True, - "ghost_speed_limit": 10, - "boundary_cell_length": 500, + "ghost_speed_limit": 25 }) ) env.reset() @@ -191,7 +158,7 @@ def test_ghost_edge(self): # check the speed limits of the edges self.assertEqual(env.k.network.speed_limit("highway_0"), 30) - self.assertEqual(env.k.network.speed_limit("highway_end"), 10) + self.assertEqual(env.k.network.speed_limit("highway_end"), 25) class TestRingNetwork(unittest.TestCase): @@ -254,150 +221,6 @@ def test_additional_net_params(self): ) -class TestI210SubNetwork(unittest.TestCase): - - """Tests I210SubNetwork in flow/networks/i210_subnetwork.py.""" - - def test_additional_net_params(self): - """Ensures that not returning the correct params leads to an error.""" - self.assertTrue( - test_additional_params( - network_class=I210SubNetwork, - additional_params={ - "on_ramp": False, - "ghost_edge": False, - } - ) - ) - - def test_specify_routes(self): - """Validates that the routes are properly specified for the network. - - This is done simply by checking the initial edges routes are specified - from, which alternates based on choice of network configuration. - - This method tests the routes for the following cases: - - 1. on_ramp = False, ghost_edge = False - 2. on_ramp = True, ghost_edge = False - 3. on_ramp = False, ghost_edge = True - 4. on_ramp = True, ghost_edge = True - """ - # test case 1 - network = I210SubNetwork( - name='test-3', - vehicles=VehicleParams(), - net_params=NetParams( - template=os.path.join( - config.PROJECT_PATH, - "examples/exp_configs/templates/sumo/test2.net.xml" - ), - additional_params={ - "on_ramp": False, - "ghost_edge": False, - }, - ), - ) - - self.assertEqual( - ['119257914'], - sorted(list(network.specify_routes(network.net_params).keys())) - ) - - del network - - # test case 2 - network = I210SubNetwork( - name='test-3', - vehicles=VehicleParams(), - net_params=NetParams( - template=os.path.join( - config.PROJECT_PATH, - "examples/exp_configs/templates/sumo/test2.net.xml" - ), - additional_params={ - "on_ramp": True, - "ghost_edge": True, - }, - ), - ) - - self.assertEqual( - ['119257908#0', - '119257908#1', - '119257908#1-AddedOffRampEdge', - '119257908#1-AddedOnRampEdge', - '119257908#2', - '119257908#3', - '119257914', - '173381935', - '27414342#0', - '27414342#1-AddedOnRampEdge', - '27414345', - 'ghost0'], - sorted(list(network.specify_routes(network.net_params).keys())) - ) - - del network - - # test case 3 - network = I210SubNetwork( - name='test-3', - vehicles=VehicleParams(), - net_params=NetParams( - template=os.path.join( - config.PROJECT_PATH, - "examples/exp_configs/templates/sumo/test2.net.xml" - ), - additional_params={ - "on_ramp": False, - "ghost_edge": True, - }, - ), - ) - - self.assertEqual( - ['119257914', 'ghost0'], - sorted(list(network.specify_routes(network.net_params).keys())) - ) - - del network - - # test case 4 - network = I210SubNetwork( - name='test-3', - vehicles=VehicleParams(), - net_params=NetParams( - template=os.path.join( - config.PROJECT_PATH, - "examples/exp_configs/templates/sumo/test2.net.xml" - ), - additional_params={ - "on_ramp": True, - "ghost_edge": True, - }, - ), - ) - - self.assertEqual( - ['119257908#0', - '119257908#1', - '119257908#1-AddedOffRampEdge', - '119257908#1-AddedOnRampEdge', - '119257908#2', - '119257908#3', - '119257914', - '173381935', - '27414342#0', - '27414342#1-AddedOnRampEdge', - '27414345', - 'ghost0'], - sorted(list(network.specify_routes(network.net_params).keys())) - ) - - del network - - ############################################################################### # Utility methods # ############################################################################### diff --git a/tests/fast_tests/test_vehicles.py b/tests/fast_tests/test_vehicles.py index 7e1405007..a37b235ff 100644 --- a/tests/fast_tests/test_vehicles.py +++ b/tests/fast_tests/test_vehicles.py @@ -33,7 +33,7 @@ def test_speed_lane_change_modes(self): speed_mode='obey_safe_speed', ), lane_change_params=SumoLaneChangeParams( - lane_change_mode="no_lc_safe", + lane_change_mode="no_lat_collide", ) ) @@ -56,7 +56,7 @@ def test_speed_lane_change_modes(self): self.assertEqual(vehicles.type_parameters["typeB"][ "car_following_params"].speed_mode, 0) self.assertEqual(vehicles.type_parameters["typeB"][ - "lane_change_params"].lane_change_mode, 512) + "lane_change_params"].lane_change_mode, 1621) vehicles.add( "typeC", @@ -89,7 +89,7 @@ def test_controlled_id_params(self): speed_mode="obey_safe_speed", ), lane_change_params=SumoLaneChangeParams( - lane_change_mode="no_lc_safe", + lane_change_mode="no_lat_collide", )) default_mingap = SumoCarFollowingParams().controller_params["minGap"] self.assertEqual(vehicles.types[0]["type_params"]["minGap"], @@ -336,7 +336,6 @@ def test_no_junctions_highway(self): "num_edges": 1, "use_ghost_edge": False, "ghost_speed_limit": 25, - "boundary_cell_length": 300, } net_params = NetParams(additional_params=additional_net_params) vehicles = VehicleParams() @@ -407,7 +406,6 @@ def test_no_junctions_highway(self): "num_edges": 3, "use_ghost_edge": False, "ghost_speed_limit": 25, - "boundary_cell_length": 300, } net_params = NetParams(additional_params=additional_net_params) vehicles = VehicleParams() @@ -477,7 +475,6 @@ def test_no_junctions_highway(self): "num_edges": 3, "use_ghost_edge": False, "ghost_speed_limit": 25, - "boundary_cell_length": 300, } net_params = NetParams(additional_params=additional_net_params) vehicles = VehicleParams() diff --git a/tests/fast_tests/test_visualizers.py b/tests/fast_tests/test_visualizers.py index 47aa9d968..bc888c498 100644 --- a/tests/fast_tests/test_visualizers.py +++ b/tests/fast_tests/test_visualizers.py @@ -91,7 +91,94 @@ def test_capacity_diagram_generator(self): np.testing.assert_array_almost_equal(std_outflows, expected_stds) def test_time_space_diagram_figure_eight(self): + # check that the exported data matches the expected emission file data + fig8_emission_data = { + 'idm_3': {'pos': [27.25, 28.25, 30.22, 33.17], + 'time': [1.0, 2.0, 3.0, 4.0], + 'vel': [0.0, 0.99, 1.98, 2.95], + 'edge': ['upper_ring', 'upper_ring', 'upper_ring', + 'upper_ring'], + 'lane': [0.0, 0.0, 0.0, 0.0]}, + 'idm_4': {'pos': [56.02, 57.01, 58.99, 61.93], + 'time': [1.0, 2.0, 3.0, 4.0], + 'vel': [0.0, 0.99, 1.98, 2.95], + 'edge': ['upper_ring', 'upper_ring', 'upper_ring', + 'upper_ring'], + 'lane': [0.0, 0.0, 0.0, 0.0]}, + 'idm_5': {'pos': [84.79, 85.78, 87.76, 90.7], + 'time': [1.0, 2.0, 3.0, 4.0], + 'vel': [0.0, 0.99, 1.98, 2.95], + 'edge': ['upper_ring', 'upper_ring', 'upper_ring', + 'upper_ring'], + 'lane': [0.0, 0.0, 0.0, 0.0]}, + 'idm_2': {'pos': [28.77, 29.76, 1.63, 4.58], + 'time': [1.0, 2.0, 3.0, 4.0], + 'vel': [0.0, 0.99, 1.97, 2.95], + 'edge': ['top', 'top', 'upper_ring', 'upper_ring'], + 'lane': [0.0, 0.0, 0.0, 0.0]}, + 'idm_13': {'pos': [106.79, 107.79, 109.77, 112.74], + 'time': [1.0, 2.0, 3.0, 4.0], + 'vel': [0.0, 0.99, 1.98, 2.96], + 'edge': ['lower_ring', 'lower_ring', 'lower_ring', + 'lower_ring'], + 'lane': [0.0, 0.0, 0.0, 0.0]}, + 'idm_9': {'pos': [22.01, 23.0, 24.97, 27.92], + 'time': [1.0, 2.0, 3.0, 4.0], + 'vel': [0.0, 0.99, 1.97, 2.95], + 'edge': ['left', 'left', 'left', 'left'], + 'lane': [0.0, 0.0, 0.0, 0.0]}, + 'idm_6': {'pos': [113.56, 114.55, 116.52, 119.47], + 'time': [1.0, 2.0, 3.0, 4.0], + 'vel': [0.0, 0.99, 1.97, 2.95], + 'edge': ['upper_ring', 'upper_ring', 'upper_ring', + 'upper_ring'], + 'lane': [0.0, 0.0, 0.0, 0.0]}, + 'idm_8': {'pos': [29.44, 0.28, 2.03, 4.78], + 'time': [1.0, 2.0, 3.0, 4.0], + 'vel': [0.0, 0.84, 1.76, 2.75], + 'edge': ['right', ':center_0', ':center_0', + ':center_0'], + 'lane': [0.0, 0.0, 0.0, 0.0]}, + 'idm_12': {'pos': [78.03, 79.02, 80.99, 83.94], + 'time': [1.0, 2.0, 3.0, 4.0], + 'vel': [0.0, 0.99, 1.98, 2.95], + 'edge': ['lower_ring', 'lower_ring', 'lower_ring', + 'lower_ring'], + 'lane': [0.0, 0.0, 0.0, 0.0]}, + 'idm_10': {'pos': [20.49, 21.48, 23.46, 26.41], + 'time': [1.0, 2.0, 3.0, 4.0], + 'vel': [0.0, 0.99, 1.98, 2.95], + 'edge': ['lower_ring', 'lower_ring', 'lower_ring', + 'lower_ring'], + 'lane': [0.0, 0.0, 0.0, 0.0]}, + 'idm_11': {'pos': [49.26, 50.25, 52.23, 55.17], + 'time': [1.0, 2.0, 3.0, 4.0], + 'vel': [0.0, 0.99, 1.98, 2.95], + 'edge': ['lower_ring', 'lower_ring', 'lower_ring', + 'lower_ring'], + 'lane': [0.0, 0.0, 0.0, 0.0]}, + 'idm_1': {'pos': [0.0, 0.99, 2.97, 5.91], + 'time': [1.0, 2.0, 3.0, 4.0], + 'vel': [0.0, 0.99, 1.98, 2.95], + 'edge': ['top', 'top', 'top', 'top'], + 'lane': [0.0, 0.0, 0.0, 0.0]}, + 'idm_7': {'pos': [0.67, 1.66, 3.64, 6.58], + 'time': [1.0, 2.0, 3.0, 4.0], + 'vel': [0.0, 0.99, 1.97, 2.94], + 'edge': ['right', 'right', 'right', 'right'], + 'lane': [0.0, 0.0, 0.0, 0.0]}, + 'idm_0': {'pos': [0.0, 1.0, 2.98, 5.95], + 'time': [1.0, 2.0, 3.0, 4.0], + 'vel': [0.0, 1.0, 1.99, 2.97], + 'edge': ['bottom', 'bottom', 'bottom', 'bottom'], + 'lane': [0.0, 0.0, 0.0, 0.0]} + } dir_path = os.path.dirname(os.path.realpath(__file__)) + actual_emission_data = tsd.import_data_from_emission( + os.path.join(dir_path, 'test_files/fig8_emission.csv')) + self.assertDictEqual(fig8_emission_data, actual_emission_data) + + # test get_time_space_data for figure eight networks flow_params = tsd.get_flow_params( os.path.join(dir_path, 'test_files/fig8.json')) emission_data, _, _, _ = tsd.import_data_from_trajectory( @@ -143,11 +230,22 @@ def test_time_space_diagram_figure_eight(self): [[2., 202.36166941], [3., 411.80333882]], [[3., 411.80333882], [4., 408.85333882]]] ) + expected_speed = np.array([ + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + [1, 0.99, 0.99, 0.99, 0.99, 0.99, 0.99, 0.99, 0.99, 0.99, 0.99, + 0.99, 0.84, 0.99], + [1.99, 1.98, 1.98, 1.98, 1.98, 1.98, 1.97, 1.98, 1.98, 1.98, 1.97, + 1.97, 1.76, 1.97] + ]) - np.testing.assert_array_almost_equal(segs, expected_segs) + np.testing.assert_array_almost_equal(pos[:-1, :], expected_pos) + np.testing.assert_array_almost_equal(speed[:-1, :], expected_speed) def test_time_space_diagram_merge(self): dir_path = os.path.dirname(os.path.realpath(__file__)) + emission_data = tsd.import_data_from_emission( + os.path.join(dir_path, 'test_files/merge_emission.csv')) + flow_params = tsd.get_flow_params( os.path.join(dir_path, 'test_files/merge.json')) emission_data, _, _, _ = tsd.import_data_from_trajectory( @@ -162,10 +260,14 @@ def test_time_space_diagram_merge(self): [[8.0000e-01, 7.2487e+02], [1.0000e+00, 7.2502e+02]]] ) - np.testing.assert_array_almost_equal(segs, expected_segs) + np.testing.assert_array_almost_equal(pos, expected_pos) + np.testing.assert_array_almost_equal(speed, expected_speed) def test_time_space_diagram_I210(self): dir_path = os.path.dirname(os.path.realpath(__file__)) + emission_data = tsd.import_data_from_emission( + os.path.join(dir_path, 'test_files/i210_emission.csv')) + module = __import__("examples.exp_configs.non_rl", fromlist=["i210_subnetwork"]) flow_params = getattr(module, "i210_subnetwork").flow_params emission_data, _, _, _ = tsd.import_data_from_trajectory( @@ -206,6 +308,9 @@ def test_time_space_diagram_I210(self): def test_time_space_diagram_ring_road(self): dir_path = os.path.dirname(os.path.realpath(__file__)) + emission_data = tsd.import_data_from_emission( + os.path.join(dir_path, 'test_files/ring_230_emission.csv')) + flow_params = tsd.get_flow_params( os.path.join(dir_path, 'test_files/ring_230.json')) emission_data, _, _, _ = tsd.import_data_from_trajectory( @@ -233,8 +338,22 @@ def test_time_space_diagram_ring_road(self): [[-7.49000000e+01, 4.77352569e+01], [-7.48000000e+01, 4.77510691e+01]], [[-7.48000000e+01, 4.77510691e+01], [-7.47000000e+01, 4.77745504e+01]]] ) - - np.testing.assert_array_almost_equal(segs, expected_segs) + expected_speed = np.array([ + [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], + [0.08, 0.08, 0.08, 0.08, 0.08, 0.08, 0.08, 0.08, 0.08, 0.08, 0.08, + 0.08, 0.08, 0.08, 0.1, 0.08, 0.08, 0.08, 0.08, 0.08, 0.08, 0.08], + [0.16, 0.16, 0.16, 0.16, 0.16, 0.16, 0.16, 0.16, 0.16, 0.16, 0.16, + 0.16, 0.16, 0.16, 0.2, 0.16, 0.16, 0.16, 0.16, 0.16, 0.16, 0.16], + [0.23, 0.23, 0.23, 0.23, 0.23, 0.23, 0.23, 0.23, 0.23, 0.23, 0.23, + 0.23, 0.23, 0.23, 0.29, 0.23, 0.23, 0.23, 0.23, 0.23, 0.23, 0.23], + [0.31, 0.31, 0.31, 0.31, 0.31, 0.31, 0.31, 0.31, 0.31, 0.31, 0.31, + 0.31, 0.31, 0.31, 0.39, 0.31, 0.31, 0.31, 0.31, 0.31, 0.31, 0.31], + [0.41, 0.41, 0.41, 0.41, 0.41, 0.41, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0] + ]) + + np.testing.assert_array_almost_equal(pos, expected_pos) + np.testing.assert_array_almost_equal(speed, expected_speed) def test_plot_ray_results(self): dir_path = os.path.dirname(os.path.realpath(__file__))