|
70 | 70 | "from fmbench.globals import * \n", |
71 | 71 | "from datetime import datetime\n", |
72 | 72 | "from datetime import timezone\n", |
| 73 | + "from datetime import timedelta\n", |
73 | 74 | "from transformers import AutoTokenizer\n", |
74 | 75 | "from sagemaker.predictor import Predictor\n", |
75 | 76 | "import importlib.resources as pkg_resources\n", |
|
494 | 495 | " sys.modules[module_name] = inference_module\n", |
495 | 496 | " spec.loader.exec_module(inference_module)\n", |
496 | 497 | " # create a predictor from each endpoint in experiments\n", |
497 | | - " return inference_module.create_predictor(ep_name, inference_spec)" |
| 498 | + " metadata: Optional[Dict] = None\n", |
| 499 | + " if ep_info[0].get('endpoint'):\n", |
| 500 | + " production_variants = ep_info[0].get('endpoint').get(\"ProductionVariants\")\n", |
| 501 | + " if production_variants is not None:\n", |
| 502 | + " variant_name = production_variants[0].get(\"VariantName\")\n", |
| 503 | + " metadata = dict(variant_name=variant_name)\n", |
| 504 | + " logger.info(f\"ep_name={ep_name}, variant_name={variant_name}\")\n", |
| 505 | + " logger.info(f\"ep_name={ep_name}, metadata={metadata}\")\n", |
| 506 | + " return inference_module.create_predictor(ep_name, inference_spec, metadata)" |
498 | 507 | ] |
499 | 508 | }, |
500 | 509 | { |
|
611 | 620 | "\n", |
612 | 621 | "# dataframe list to hold metrics for each endpoint\n", |
613 | 622 | "df_ep_metrics_list = []\n", |
| 623 | + "# list for holding predictors and run start and end timestamp\n", |
| 624 | + "# because cloud watch metrics are available after a 1-minute delay\n", |
| 625 | + "predictors_and_metrics_timestamp_list = []\n", |
614 | 626 | "\n", |
615 | 627 | "for e_idx, experiment in enumerate(config['experiments']):\n", |
616 | 628 | " # Start timer for the experiment \n", |
|
626 | 638 | " prompt_tokens_total: int = 0\n", |
627 | 639 | " completion_tokens_total: int = 0\n", |
628 | 640 | " for concurrency, payload_file, split_payload in combination_data:\n", |
629 | | - " experiment_at_concurrency_start_dttm = datetime.now()\n", |
| 641 | + " # track time at minute boundaries\n", |
| 642 | + " experiment_at_concurrency_start_dttm = datetime.utcnow().replace(second=0, microsecond=0)\n", |
630 | 643 | " for chunk_index, chunk in enumerate(split_payload):\n", |
631 | 644 | " logger.info(f\"experiment_index={e_idx+1}/{num_experiments}, \"\n", |
632 | 645 | " f\"concurrency={concurrency}, payload_file={payload_file}, \"\n", |
|
666 | 679 | " METRICS_PER_INFERENCE_DIR,\n", |
667 | 680 | " response_file_name)\n", |
668 | 681 | " # save endpoint metrics\n", |
669 | | - " df_ep_metrics = predictor.get_metrics(experiment_at_concurrency_start_dttm,\n", |
670 | | - " datetime.now())\n", |
671 | | - " if df_ep_metrics is not None:\n", |
672 | | - " # we want concurrency after timestamp, endpoint name\n", |
673 | | - " df_ep_metrics.insert(loc=2,\n", |
674 | | - " column='instance_type',\n", |
675 | | - " value=experiment['instance_type'])\n", |
676 | | - " df_ep_metrics.insert(loc=3,\n", |
677 | | - " column='concurrency',\n", |
678 | | - " value=concurrency)\n", |
679 | | - " df_ep_metrics_list.append(df_ep_metrics)\n", |
| 682 | + " experiment_at_concurrency_end_dttm = datetime.utcnow().replace(second=0, microsecond=0)\n", |
| 683 | + " # if the endtime and start time are in the same minute then move the endtime to the next\n", |
| 684 | + " # minute otherwise cloudwatch would return an empty resonse\n", |
| 685 | + " time_delta_in_seconds = (experiment_at_concurrency_end_dttm - experiment_at_concurrency_start_dttm).seconds\n", |
| 686 | + " if time_delta_in_seconds < 60:\n", |
| 687 | + " experiment_at_concurrency_end_dttm += timedelta(seconds=60)\n", |
| 688 | + "\n", |
| 689 | + " predictors_and_metrics_timestamp_list.append((predictor,\n", |
| 690 | + " experiment_at_concurrency_start_dttm,\n", |
| 691 | + " experiment_at_concurrency_end_dttm,\n", |
| 692 | + " concurrency,\n", |
| 693 | + " experiment['instance_type']))\n", |
680 | 694 | "\n", |
681 | 695 | " # Experiment done, stopping the timer for this given experiment\n", |
682 | 696 | " experiment_end_time = time.perf_counter()\n", |
|
715 | 729 | " f\"duration={experiment_duration:.6f} seconds, exp_cost={exp_cost:.6f}, done\")" |
716 | 730 | ] |
717 | 731 | }, |
| 732 | + { |
| 733 | + "cell_type": "code", |
| 734 | + "execution_count": null, |
| 735 | + "metadata": {}, |
| 736 | + "outputs": [], |
| 737 | + "source": [ |
| 738 | + "# add a 1-minute sleep to be able to grab the CW metrics from the last run\n", |
| 739 | + "sleep_time: int = 60\n", |
| 740 | + "logger.info(f\"going to sleep for {sleep_time}s before querying metrics from the endpoint\")\n", |
| 741 | + "time.sleep(sleep_time)\n", |
| 742 | + "logger.info(f\"after sleep for {sleep_time}s before querying metrics from the endpoint\")\n", |
| 743 | + "\n", |
| 744 | + "for predictor, \\\n", |
| 745 | + " experiment_at_concurrency_start_dttm, \\\n", |
| 746 | + " experiment_at_concurrency_end_dttm, \\\n", |
| 747 | + " concurrency, \\\n", |
| 748 | + " instance_type in predictors_and_metrics_timestamp_list:\n", |
| 749 | + " # save endpoint metrics\n", |
| 750 | + " df_ep_metrics = predictor.get_metrics(experiment_at_concurrency_start_dttm,\n", |
| 751 | + " experiment_at_concurrency_end_dttm)\n", |
| 752 | + " if df_ep_metrics is not None:\n", |
| 753 | + " # we want concurrency after timestamp, endpoint name\n", |
| 754 | + " df_ep_metrics.insert(loc=2,\n", |
| 755 | + " column='instance_type',\n", |
| 756 | + " value=instance_type)\n", |
| 757 | + " df_ep_metrics.insert(loc=3,\n", |
| 758 | + " column='concurrency',\n", |
| 759 | + " value=concurrency)\n", |
| 760 | + " df_ep_metrics_list.append(df_ep_metrics)" |
| 761 | + ] |
| 762 | + }, |
718 | 763 | { |
719 | 764 | "cell_type": "code", |
720 | 765 | "execution_count": null, |
|
0 commit comments