From bb08857cafa152105e2776897448f398e6b07d6e Mon Sep 17 00:00:00 2001 From: Rajiv Vaidyanathan Date: Fri, 24 Mar 2023 15:37:28 +0530 Subject: [PATCH 1/3] Building support for local deployment --- sebs/cache.py | 1 + sebs/experiments/perf_cost.py | 40 +++++----- sebs/faas/system.py | 27 ++++--- sebs/local/function.py | 1 + sebs/local/local.py | 134 ++++++++++++++++++---------------- 5 files changed, 110 insertions(+), 93 deletions(-) diff --git a/sebs/cache.py b/sebs/cache.py index ed5096e6..86eff067 100644 --- a/sebs/cache.py +++ b/sebs/cache.py @@ -158,6 +158,7 @@ def update_storage(self, deployment: str, benchmark: str, config: dict): with self._lock: with open(os.path.join(benchmark_dir, "config.json"), "r") as fp: cached_config = json.load(fp) + cached_config[deployment] = {} cached_config[deployment]["storage"] = config with open(os.path.join(benchmark_dir, "config.json"), "w") as fp: json.dump(cached_config, fp, indent=2) diff --git a/sebs/experiments/perf_cost.py b/sebs/experiments/perf_cost.py index 36cde660..400c5f80 100644 --- a/sebs/experiments/perf_cost.py +++ b/sebs/experiments/perf_cost.py @@ -46,21 +46,22 @@ def prepare(self, sebs_client: "SeBS", deployment_client: FaaSSystem): self._benchmark = sebs_client.get_benchmark( settings["benchmark"], deployment_client, self.config ) - self._function = deployment_client.get_function(self._benchmark) + self._functions = deployment_client.get_function(self._benchmark, 3) # prepare benchmark input self._storage = deployment_client.get_storage(replace_existing=self.config.update_storage) self._benchmark_input = self._benchmark.prepare_input( storage=self._storage, size=settings["input-size"] ) + for i in range(len(self._functions)): - # add HTTP trigger - triggers = self._function.triggers(Trigger.TriggerType.HTTP) - if len(triggers) == 0: - self._trigger = deployment_client.create_trigger( - self._function, Trigger.TriggerType.HTTP - ) - else: - self._trigger = triggers[0] + # add HTTP trigger + triggers = self._functions[i].triggers(Trigger.TriggerType.HTTP) + if len(triggers) == 0: + self._trigger = deployment_client.create_trigger( + self._functions[i], Trigger.TriggerType.HTTP + ) + else: + self._trigger = triggers[0] self._out_dir = os.path.join(sebs_client.output_dir, "perf-cost") if not os.path.exists(self._out_dir): @@ -77,15 +78,15 @@ def run(self): if len(memory_sizes) == 0: self.logging.info("Begin experiment") self.run_configuration(settings, settings["repetitions"]) - for memory in memory_sizes: - self.logging.info(f"Begin experiment on memory size {memory}") - self._function.memory = memory - self._deployment_client.update_function(self._function, self._benchmark) - self._sebs_client.cache_client.update_function(self._function) - self.run_configuration(settings, settings["repetitions"], suffix=str(memory)) + for i in range(len(self._functions)): + for memory in memory_sizes: + self.logging.info(f"Begin experiment on memory size {memory}") + self._functions[i].memory = memory + self._deployment_client.update_function(self._functions[i], self._benchmark) + self._sebs_client.cache_client.update_function(self._functions[i]) + self.run_configuration(settings, settings["repetitions"], suffix=str(memory)) def compute_statistics(self, times: List[float]): - mean, median, std, cv = basic_stats(times) self.logging.info(f"Mean {mean} [ms], median {median} [ms], std {std}, CV {cv}") for alpha in [0.95, 0.99]: @@ -154,9 +155,8 @@ def _run_configuration( if run_type == PerfCost.RunType.COLD or run_type == PerfCost.RunType.BURST: self._deployment_client.enforce_cold_start( - [self._function], self._benchmark + self._functions, self._benchmark ) - time.sleep(5) results = [] @@ -179,7 +179,8 @@ def _run_configuration( elif run_type == PerfCost.RunType.WARM and ret.stats.cold_start: self.logging.info(f"Invocation {ret.request_id} is cold!") else: - result.add_invocation(self._function, ret) + for i in range(len(self._functions)): + result.add_invocation(self._functions[i], ret) colds_count += ret.stats.cold_start client_times.append(ret.times.client / 1000.0) samples_gathered += 1 @@ -224,7 +225,6 @@ def _run_configuration( ) def run_configuration(self, settings: dict, repetitions: int, suffix: str = ""): - for experiment_type in settings["experiments"]: if experiment_type == "cold": self._run_configuration( diff --git a/sebs/faas/system.py b/sebs/faas/system.py index 64923255..0f3cf174 100644 --- a/sebs/faas/system.py +++ b/sebs/faas/system.py @@ -115,7 +115,7 @@ def package_code( pass @abstractmethod - def create_function(self, code_package: Benchmark, func_name: str) -> Function: + def create_function(self, code_package: Benchmark, func_name: str, num: int) -> Function: pass @abstractmethod @@ -139,7 +139,7 @@ def update_function(self, function: Function, code_package: Benchmark): """ - def get_function(self, code_package: Benchmark, func_name: Optional[str] = None) -> Function: + def get_function(self, code_package: Benchmark, num: int, func_name: Optional[str] = None) -> List[Function]: if code_package.language_version not in self.system_config.supported_language_versions( self.name(), code_package.language_name @@ -171,15 +171,16 @@ def get_function(self, code_package: Benchmark, func_name: Optional[str] = None) else "function {} not found in cache.".format(func_name) ) self.logging.info("Creating new function! Reason: " + msg) - function = self.create_function(code_package, func_name) - self.cache_client.add_function( - deployment_name=self.name(), - language_name=code_package.language_name, - code_package=code_package, - function=function, - ) - code_package.query_cache() - return function + function_list = self.create_function(code_package, func_name, num) + for function in function_list: + self.cache_client.add_function( + deployment_name=self.name(), + language_name=code_package.language_name, + code_package=code_package, + function=function, + ) + code_package.query_cache() + return function_list else: # retrieve function cached_function = functions[func_name] @@ -221,7 +222,9 @@ def get_function(self, code_package: Benchmark, func_name: Optional[str] = None) code_package.query_cache() else: self.logging.info(f"Cached function {func_name} is up to date.") - return function + function_list = [] + function_list.append(function) + return function_list @abstractmethod def update_function_configuration(self, cached_function: Function, benchmark: Benchmark): diff --git a/sebs/local/function.py b/sebs/local/function.py index a5eb2406..0165462e 100644 --- a/sebs/local/function.py +++ b/sebs/local/function.py @@ -99,6 +99,7 @@ def deserialize(cached_config: dict) -> "LocalFunction": ) except docker.errors.NotFound: raise RuntimeError(f"Cached container {instance_id} not available anymore!") + # clear cache def stop(self): self.logging.info(f"Stopping function container {self._instance_id}") diff --git a/sebs/local/local.py b/sebs/local/local.py index 2d1567b0..d0669de7 100644 --- a/sebs/local/local.py +++ b/sebs/local/local.py @@ -103,7 +103,8 @@ def get_storage(self, replace_existing: bool = False) -> PersistentStorage: """ def shutdown(self): - pass + if hasattr(self, "storage") and self.config.shutdownStorage: + self.storage.stop() """ It would be sufficient to just pack the code and ship it as zip to AWS. @@ -151,8 +152,8 @@ def package_code( return directory, bytes_size - def create_function(self, code_package: Benchmark, func_name: str) -> "LocalFunction": - + def create_function(self, code_package: Benchmark, func_name: str, num: int) -> "LocalFunction": + func_list = [] container_name = "{}:run.local.{}.{}".format( self._system_config.docker_repository(), code_package.language_name, @@ -170,66 +171,72 @@ def create_function(self, code_package: Benchmark, func_name: str) -> "LocalFunc self.name(), code_package.language_name ), } - container = self._docker_client.containers.run( - image=container_name, - command=f"/bin/bash /sebs/run_server.sh {self.DEFAULT_PORT}", - volumes={code_package.code_location: {"bind": "/function", "mode": "ro"}}, - environment=environment, - # FIXME: make CPUs configurable - # FIXME: configure memory - # FIXME: configure timeout - # cpuset_cpus=cpuset, - # required to access perf counters - # alternative: use custom seccomp profile - privileged=True, - security_opt=["seccomp:unconfined"], - network_mode="bridge", - # somehow removal of containers prevents checkpointing from working? - remove=self.remove_containers, - stdout=True, - stderr=True, - detach=True, - # tty=True, - ) - - pid: Optional[int] = None - if self.measurements_enabled and self._memory_measurement_path is not None: - # launch subprocess to measure memory - proc = subprocess.Popen( - [ - "python3", - "./sebs/local/measureMem.py", - "--container-id", - container.id, - "--measure-interval", - str(self._measure_interval), - "--measurement-file", - self._memory_measurement_path, - ] + for i in range(num): + container = self._docker_client.containers.run( + image=container_name, + name=func_name+f"_{i}", + command=f"/bin/bash /sebs/run_server.sh {self.DEFAULT_PORT}", + volumes={code_package.code_location: {"bind": "/function", "mode": "ro"}}, + environment=environment, + # FIXME: make CPUs configurable + # FIXME: configure memory + # FIXME: configure timeout + # cpuset_cpus=cpuset, + # required to access perf counters + # alternative: use custom seccomp profile + privileged=True, + security_opt=["seccomp:unconfined"], + network_mode="bridge", + # somehow removal of containers prevents checkpointing from working? + remove=self.remove_containers, + stdout=True, + stderr=True, + detach=True, + # tty=True, ) - pid = proc.pid - - function_cfg = FunctionConfig.from_benchmark(code_package) - func = LocalFunction( - container, - self.DEFAULT_PORT, - func_name, - code_package.benchmark, - code_package.hash, - function_cfg, - pid, - ) - self.logging.info( - f"Started {func_name} function at container {container.id} , running on {func._url}" - ) - return func - """ - FIXME: restart Docker? - """ + pid: Optional[int] = None + if self.measurements_enabled and self._memory_measurement_path is not None: + # launch subprocess to measure memory + proc = subprocess.Popen( + [ + "python3", + "./sebs/local/measureMem.py", + "--container-id", + container.id, + "--measure-interval", + str(self._measure_interval), + "--measurement-file", + self._memory_measurement_path, + ] + ) + pid = proc.pid + + function_cfg = FunctionConfig.from_benchmark(code_package) + func = LocalFunction( + container, + self.DEFAULT_PORT, + func_name, + code_package.benchmark, + code_package.hash, + function_cfg, + pid, + ) + func_list.append(func) + self.logging.info( + f"Started {func_name} function at container {container.id} , running on {func._url}" + ) + return func_list def update_function(self, function: Function, code_package: Benchmark): - pass + # kill existing containers + count = 0 + for ctr in self._docker_client.containers.list(): + if ctr.name in function.name: + count += 1 + ctr.kill() + # deploy new containers with updated function + self.create_function(code_package, function.name, count) """ For local functions, we don't need to do anything for a cached function. @@ -251,7 +258,8 @@ def create_trigger(self, func: Function, trigger_type: Trigger.TriggerType) -> T return trigger def cached_function(self, function: Function): - pass + for trigger in function.triggers(Trigger.TriggerType.LIBRARY): + trigger.logging_handlers = self.logging_handlers def update_function_configuration(self, function: Function, code_package: Benchmark): self.logging.error("Updating function configuration of local deployment is not supported") @@ -268,7 +276,11 @@ def download_metrics( pass def enforce_cold_start(self, functions: List[Function], code_package: Benchmark): - raise NotImplementedError() + fn_names = [fn.name for fn in functions] + for ctr in self._docker_client.containers.list(): + for i in range(len(fn_names)): + if ctr.name in fn_names[i]: + ctr.kill() @staticmethod def default_function_name(code_package: Benchmark) -> str: From 56a38b6d97687881a5bdf42631dddff6b9c0df69 Mon Sep 17 00:00:00 2001 From: Rajiv Vaidyanathan Date: Tue, 28 Mar 2023 23:20:39 +0530 Subject: [PATCH 2/3] Updated comment with FIXME --- sebs/local/function.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sebs/local/function.py b/sebs/local/function.py index 0165462e..a918159c 100644 --- a/sebs/local/function.py +++ b/sebs/local/function.py @@ -99,7 +99,7 @@ def deserialize(cached_config: dict) -> "LocalFunction": ) except docker.errors.NotFound: raise RuntimeError(f"Cached container {instance_id} not available anymore!") - # clear cache + # FIXME: clear cache def stop(self): self.logging.info(f"Stopping function container {self._instance_id}") From 9e7b64e40ee4540a79268f875ff1bd68a879ef0d Mon Sep 17 00:00:00 2001 From: Rajiv Vaidyanathan Date: Sun, 2 Apr 2023 04:44:51 +0530 Subject: [PATCH 3/3] Reset create_function() API and associated changes --- sebs/experiments/perf_cost.py | 37 +++++------ sebs/faas/system.py | 27 ++++---- sebs/local/local.py | 119 ++++++++++++++++------------------ 3 files changed, 86 insertions(+), 97 deletions(-) diff --git a/sebs/experiments/perf_cost.py b/sebs/experiments/perf_cost.py index 400c5f80..5372e887 100644 --- a/sebs/experiments/perf_cost.py +++ b/sebs/experiments/perf_cost.py @@ -46,22 +46,21 @@ def prepare(self, sebs_client: "SeBS", deployment_client: FaaSSystem): self._benchmark = sebs_client.get_benchmark( settings["benchmark"], deployment_client, self.config ) - self._functions = deployment_client.get_function(self._benchmark, 3) + self._function = deployment_client.get_function(self._benchmark) # prepare benchmark input self._storage = deployment_client.get_storage(replace_existing=self.config.update_storage) self._benchmark_input = self._benchmark.prepare_input( storage=self._storage, size=settings["input-size"] ) - for i in range(len(self._functions)): - # add HTTP trigger - triggers = self._functions[i].triggers(Trigger.TriggerType.HTTP) - if len(triggers) == 0: - self._trigger = deployment_client.create_trigger( - self._functions[i], Trigger.TriggerType.HTTP - ) - else: - self._trigger = triggers[0] + # add HTTP trigger + triggers = self._function.triggers(Trigger.TriggerType.HTTP) + if len(triggers) == 0: + self._trigger = deployment_client.create_trigger( + self._function, Trigger.TriggerType.HTTP + ) + else: + self._trigger = triggers[0] self._out_dir = os.path.join(sebs_client.output_dir, "perf-cost") if not os.path.exists(self._out_dir): @@ -78,13 +77,12 @@ def run(self): if len(memory_sizes) == 0: self.logging.info("Begin experiment") self.run_configuration(settings, settings["repetitions"]) - for i in range(len(self._functions)): - for memory in memory_sizes: - self.logging.info(f"Begin experiment on memory size {memory}") - self._functions[i].memory = memory - self._deployment_client.update_function(self._functions[i], self._benchmark) - self._sebs_client.cache_client.update_function(self._functions[i]) - self.run_configuration(settings, settings["repetitions"], suffix=str(memory)) + for memory in memory_sizes: + self.logging.info(f"Begin experiment on memory size {memory}") + self._function.memory = memory + self._deployment_client.update_function(self._function, self._benchmark) + self._sebs_client.cache_client.update_function(self._function) + self.run_configuration(settings, settings["repetitions"], suffix=str(memory)) def compute_statistics(self, times: List[float]): mean, median, std, cv = basic_stats(times) @@ -155,7 +153,7 @@ def _run_configuration( if run_type == PerfCost.RunType.COLD or run_type == PerfCost.RunType.BURST: self._deployment_client.enforce_cold_start( - self._functions, self._benchmark + [self._function], self._benchmark ) time.sleep(5) @@ -179,8 +177,7 @@ def _run_configuration( elif run_type == PerfCost.RunType.WARM and ret.stats.cold_start: self.logging.info(f"Invocation {ret.request_id} is cold!") else: - for i in range(len(self._functions)): - result.add_invocation(self._functions[i], ret) + result.add_invocation(self._function, ret) colds_count += ret.stats.cold_start client_times.append(ret.times.client / 1000.0) samples_gathered += 1 diff --git a/sebs/faas/system.py b/sebs/faas/system.py index 0f3cf174..64923255 100644 --- a/sebs/faas/system.py +++ b/sebs/faas/system.py @@ -115,7 +115,7 @@ def package_code( pass @abstractmethod - def create_function(self, code_package: Benchmark, func_name: str, num: int) -> Function: + def create_function(self, code_package: Benchmark, func_name: str) -> Function: pass @abstractmethod @@ -139,7 +139,7 @@ def update_function(self, function: Function, code_package: Benchmark): """ - def get_function(self, code_package: Benchmark, num: int, func_name: Optional[str] = None) -> List[Function]: + def get_function(self, code_package: Benchmark, func_name: Optional[str] = None) -> Function: if code_package.language_version not in self.system_config.supported_language_versions( self.name(), code_package.language_name @@ -171,16 +171,15 @@ def get_function(self, code_package: Benchmark, num: int, func_name: Optional[st else "function {} not found in cache.".format(func_name) ) self.logging.info("Creating new function! Reason: " + msg) - function_list = self.create_function(code_package, func_name, num) - for function in function_list: - self.cache_client.add_function( - deployment_name=self.name(), - language_name=code_package.language_name, - code_package=code_package, - function=function, - ) - code_package.query_cache() - return function_list + function = self.create_function(code_package, func_name) + self.cache_client.add_function( + deployment_name=self.name(), + language_name=code_package.language_name, + code_package=code_package, + function=function, + ) + code_package.query_cache() + return function else: # retrieve function cached_function = functions[func_name] @@ -222,9 +221,7 @@ def get_function(self, code_package: Benchmark, num: int, func_name: Optional[st code_package.query_cache() else: self.logging.info(f"Cached function {func_name} is up to date.") - function_list = [] - function_list.append(function) - return function_list + return function @abstractmethod def update_function_configuration(self, cached_function: Function, benchmark: Benchmark): diff --git a/sebs/local/local.py b/sebs/local/local.py index d0669de7..e1513727 100644 --- a/sebs/local/local.py +++ b/sebs/local/local.py @@ -105,7 +105,6 @@ def get_storage(self, replace_existing: bool = False) -> PersistentStorage: def shutdown(self): if hasattr(self, "storage") and self.config.shutdownStorage: self.storage.stop() - """ It would be sufficient to just pack the code and ship it as zip to AWS. However, to have a compatible function implementation across providers, @@ -152,8 +151,8 @@ def package_code( return directory, bytes_size - def create_function(self, code_package: Benchmark, func_name: str, num: int) -> "LocalFunction": - func_list = [] + def create_function(self, code_package: Benchmark, func_name: str) -> "LocalFunction": + container_name = "{}:run.local.{}.{}".format( self._system_config.docker_repository(), code_package.language_name, @@ -171,72 +170,69 @@ def create_function(self, code_package: Benchmark, func_name: str, num: int) -> self.name(), code_package.language_name ), } - for i in range(num): - container = self._docker_client.containers.run( - image=container_name, - name=func_name+f"_{i}", - command=f"/bin/bash /sebs/run_server.sh {self.DEFAULT_PORT}", - volumes={code_package.code_location: {"bind": "/function", "mode": "ro"}}, - environment=environment, - # FIXME: make CPUs configurable - # FIXME: configure memory - # FIXME: configure timeout - # cpuset_cpus=cpuset, - # required to access perf counters - # alternative: use custom seccomp profile - privileged=True, - security_opt=["seccomp:unconfined"], - network_mode="bridge", - # somehow removal of containers prevents checkpointing from working? - remove=self.remove_containers, - stdout=True, - stderr=True, - detach=True, - # tty=True, - ) - pid: Optional[int] = None - if self.measurements_enabled and self._memory_measurement_path is not None: - # launch subprocess to measure memory - proc = subprocess.Popen( - [ - "python3", - "./sebs/local/measureMem.py", - "--container-id", - container.id, - "--measure-interval", - str(self._measure_interval), - "--measurement-file", - self._memory_measurement_path, - ] - ) - pid = proc.pid - - function_cfg = FunctionConfig.from_benchmark(code_package) - func = LocalFunction( - container, - self.DEFAULT_PORT, - func_name, - code_package.benchmark, - code_package.hash, - function_cfg, - pid, - ) - func_list.append(func) - self.logging.info( - f"Started {func_name} function at container {container.id} , running on {func._url}" + container = self._docker_client.containers.run( + image=container_name, + name=func_name, + command=f"/bin/bash /sebs/run_server.sh {self.DEFAULT_PORT}", + volumes={code_package.code_location: {"bind": "/function", "mode": "ro"}}, + environment=environment, + # FIXME: make CPUs configurable + # FIXME: configure memory + # FIXME: configure timeout + # cpuset_cpus=cpuset, + # required to access perf counters + # alternative: use custom seccomp profile + privileged=True, + security_opt=["seccomp:unconfined"], + network_mode="bridge", + # somehow removal of containers prevents checkpointing from working? + remove=self.remove_containers, + stdout=True, + stderr=True, + detach=True, + # tty=True, + ) + + pid: Optional[int] = None + if self.measurements_enabled and self._memory_measurement_path is not None: + # launch subprocess to measure memory + proc = subprocess.Popen( + [ + "python3", + "./sebs/local/measureMem.py", + "--container-id", + container.id, + "--measure-interval", + str(self._measure_interval), + "--measurement-file", + self._memory_measurement_path, + ] ) - return func_list + pid = proc.pid + + function_cfg = FunctionConfig.from_benchmark(code_package) + func = LocalFunction( + container, + self.DEFAULT_PORT, + func_name, + code_package.benchmark, + code_package.hash, + function_cfg, + pid, + ) + self.logging.info( + f"Started {func_name} function at container {container.id} , running on {func._url}" + ) + return func def update_function(self, function: Function, code_package: Benchmark): # kill existing containers - count = 0 for ctr in self._docker_client.containers.list(): if ctr.name in function.name: - count += 1 ctr.kill() # deploy new containers with updated function - self.create_function(code_package, function.name, count) + self.create_function(code_package, function.name) """ For local functions, we don't need to do anything for a cached function. @@ -278,8 +274,7 @@ def download_metrics( def enforce_cold_start(self, functions: List[Function], code_package: Benchmark): fn_names = [fn.name for fn in functions] for ctr in self._docker_client.containers.list(): - for i in range(len(fn_names)): - if ctr.name in fn_names[i]: + if ctr.name in fn_names: ctr.kill() @staticmethod