1+ """
2+ Analyze data across multiple fmbench runs
3+ """
4+ import os
5+ import math
6+ import glob
7+ import json
8+ import yaml
9+ import logging
10+ import argparse
11+ import pandas as pd
12+ from pathlib import Path
13+
14+ logging .basicConfig (format = '[%(asctime)s] p%(process)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s' , level = logging .INFO )
15+ logger = logging .getLogger (__name__ )
16+
17+ RPM_LIST = [1 , 10 , 100 , 1000 , 10000 ]
18+ LATENCY_THRESHOLD : int = 2
19+ RESULTS_DIR : str = "./"
20+ PAYLOAD_FILE_OF_INTEREST : str = "payload_en_3000-3840.jsonl"
21+ MODEL : str = "llama3-8b-instruct"
22+ PRICING_FILE_PATH : str = os .path .join ("src" , "fmbench" , "configs" , "pricing.yml" )
23+
24+ def cost_per_txn (row , pricing ):
25+ txns_per_hour = row ['transactions_per_minute' ] * 60
26+ instance_cost_per_hour = pricing ['pricing' ]['instance_based' ][row ['instance_type' ]]
27+ cost_per_txn = round (instance_cost_per_hour / txns_per_hour , 4 )
28+ return cost_per_txn
29+
30+ def cost_per_1k_tokens (row , pricing ):
31+ txns_per_hour = row ['transactions_per_minute' ] * 60
32+ tokens_per_hour = (row ['prompt_token_count_mean' ] + row ['completion_token_count_mean' ]) * txns_per_hour
33+ instance_cost_per_hour = pricing ['pricing' ]['instance_based' ][row ['instance_type' ]]
34+ cost_per_1k_tokens = round (1000 * (instance_cost_per_hour / tokens_per_hour ), 8 )
35+ return cost_per_1k_tokens
36+
37+ # Determine how many instances would be required to run 100 requests/minute,
38+ # 1000 requests/minute, 10000 requests/minute. The idea being that at the
39+ # low end of the total number of requests/minute smaller instances which provide
40+ # good inference latency at low concurrencies would suffice (said another way,
41+ # the larger more expensive instances are an overkill at this stage) but as
42+ # the number of requests/minute increase there comes an inflexion point beyond
43+ # which the number of smaller instances required would be so much that it
44+ # would be more economical to use fewer instances of the larger more expensive instances.
45+ def cost_per_n_rpm (r , rpm , pricing ):
46+ instance_count_needed = math .ceil (rpm / r ['transactions_per_minute' ])
47+ cost = round (instance_count_needed * pricing ['pricing' ]['instance_based' ][r ['instance_type' ]], 2 )
48+ return (instance_count_needed , cost )
49+
50+
51+
52+ def main ():
53+ parser = argparse .ArgumentParser (description = 'Analyze mukltiple FMBench runs' )
54+ parser .add_argument ('--latency-threshold' ,
55+ type = int ,
56+ default = LATENCY_THRESHOLD ,
57+ help = f'Latency threshold, runs with p95 above this are not useful, default={ LATENCY_THRESHOLD } ' ,
58+ required = False )
59+ parser .add_argument ('--payload-file' ,
60+ type = str ,
61+ default = PAYLOAD_FILE_OF_INTEREST ,
62+ help = f'Payload file representing payload of interest, default={ PAYLOAD_FILE_OF_INTEREST } ' ,
63+ required = False )
64+ parser .add_argument ('--model-id' ,
65+ type = str ,
66+ default = MODEL ,
67+ help = f'Model for which data is being analyzed, default={ MODEL } ' ,
68+ required = False )
69+ args = parser .parse_args ()
70+ print (f"main, { args } = args" )
71+
72+ # load pricing info
73+ pricing = yaml .safe_load (Path (PRICING_FILE_PATH ).read_text ())
74+ logger .info (f"pricing={ json .dumps (pricing , indent = 2 )} " )
75+
76+ # all results file to be parsed
77+ all_metrics_summary_files = glob .glob (os .path .join (RESULTS_DIR , "results-*" ,
78+ "all_metrics_summary.csv" ),
79+ recursive = True )
80+ logger .info (f"found { len (all_metrics_summary_files )} files { all_metrics_summary_files } " )
81+
82+ # read all results file in a single dataframe
83+ df = pd .concat (list (map (pd .read_csv , all_metrics_summary_files )))
84+ logger .info (f"read { len (all_metrics_summary_files )} files in a dataframe of shape { df .shape } " )
85+
86+ # filter to keep only relevant data
87+ df_selected = df [df .latency_p95 <= args .latency_threshold ]
88+ logger .info (f"after filtering to keep rows with latency_p95 <= { args .latency_threshold } s, df shape { df_selected .shape } " )
89+
90+ # select row with highest concurrency level
91+ grouping_cols = ["experiment_name" , "payload_file" , "instance_type" , "instance_count" ]
92+ df_summary_all = df_selected .loc [df_selected .groupby (grouping_cols )['concurrency' ].transform (max ) == df_selected ['concurrency' ]]
93+
94+ # find price per txn and price per token
95+ df_summary_all ['cost_per_txn' ] = df_summary_all .apply (lambda r : cost_per_txn (r , pricing ), axis = 1 )
96+ df_summary_all ['cost_per_1k_tokens' ] = df_summary_all .apply (lambda r : cost_per_1k_tokens (r , pricing ), axis = 1 )
97+
98+ # extrapolate to price per n requests per minue
99+ for rpm in RPM_LIST :
100+ col_name = f"instance_count_and_cost_{ rpm } _rpm"
101+ df_summary_all [col_name ] = df_summary_all .apply (lambda r : cost_per_n_rpm (r , rpm , pricing ), axis = 1 )
102+
103+ df_summary_all = df_summary_all .sort_values (by = "cost_per_1k_tokens" )
104+
105+ summary_file : str = f"{ args .model_id } -summary-p95-latency={ args .latency_threshold } s.csv"
106+ df_summary_all .to_csv (summary_file , index = False )
107+ logger .info (f"saved df_summary_all dataframe of shape={ df_summary_all .shape } in { summary_file } " )
108+
109+ summary_file_payload_of_interest : str = f"{ args .model_id } -summary-{ Path (args .payload_file ).stem } -p95-latency={ LATENCY_THRESHOLD } s.csv"
110+ df_summary_payload_of_interest = df_summary_all [df_summary_all .payload_file == args .payload_file ]
111+ df_summary_payload_of_interest = df_summary_payload_of_interest .sort_values (by = "cost_per_1k_tokens" )
112+
113+ df_summary_payload_of_interest .to_csv (summary_file_payload_of_interest , index = False )
114+ logger .info (f"saved df_summary_payload_of_interest dataframe of " \
115+ f"shape={ df_summary_payload_of_interest .shape } in { summary_file_payload_of_interest } " )
116+ logger .info ("all done" )
117+
118+
119+ if __name__ == "__main__" :
120+ main ()
0 commit comments