Skip to content

[RAPTOR-11320] add a layer to extract lazy loading info from environment variables and secretes #1117

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
18 changes: 18 additions & 0 deletions custom_model_runner/datarobot_drum/lazy_loading/constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Copyright 2024 DataRobot, Inc. and its affiliates.
#
# All rights reserved.
#
# DataRobot, Inc.
#
# This is proprietary source code of DataRobot, Inc. and its
# affiliates.
#
# Released under the terms of DataRobot Tool and Utility Agreement.

MLOPS_RUNTIME_PARAM_PREFIX = "MLOPS_RUNTIME_PARAM_"
MLOPS_LAZY_LOADING_DATA_ENV_VARIABLE = "MLOPS_LAZY_LOADING_DATA"
MLOPS_REPOSITORY_SECRET_PREFIX = "MLOPS_REPOSITORY_SECRET_PREFIX_"
AWS_DEFAULT_REGION = "us-east-1"
REMOTE_FILE_SUFFIX = ".remote"
METADATA_FILE = "model-metadata.yaml"
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#
# Copyright 2024 DataRobot, Inc. and its affiliates.
#
# All rights reserved.
#
# DataRobot, Inc.
#
# This is proprietary source code of DataRobot, Inc. and its
# affiliates.
#
# Released under the terms of DataRobot Tool and Utility Agreement.
import json
import logging
import os

from custom_model_runner.datarobot_drum.lazy_loading.constants import MLOPS_REPOSITORY_SECRET_PREFIX

logger = logging.getLogger(__name__)


def handle_credentials_param(credential_id):
"""
Take a credential_id and create corresponding credentials object from env variable
so the client that requires the credentials can use that object.

:param credential_id:
"""
credentials_env_variable = MLOPS_REPOSITORY_SECRET_PREFIX + credential_id.upper()
param_json = os.environ.get(credentials_env_variable, None)
if param_json is None:
raise EnvironmentError(
"expected environment variable '{}' to be set".format(credentials_env_variable)
)
# logger.debug(f"param_json: {param_json}") TODO: mask credentials for logging

json_content = json.loads(param_json)
if param_json is None:
raise EnvironmentError(
"expected environment variable '{}' to be json".format(credentials_env_variable)
)

logger.debug("Successfully loaded JSON content")
return json_content["payload"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#
# Copyright 2024 DataRobot, Inc. and its affiliates.
#
# All rights reserved.
#
# DataRobot, Inc.
#
# This is proprietary source code of DataRobot, Inc. and its
# affiliates.
#
# Released under the terms of DataRobot Tool and Utility Agreement.
import logging
import time
from multiprocessing import Lock

from custom_model_runner.datarobot_drum.lazy_loading.remote_file import RemoteFile

logger = logging.getLogger(__name__)


class ProgressPercentage:
def __init__(
self,
update_interval_secs=10,
update_interval_mb=100,
remote_file: RemoteFile = None,
):
self._remote_file = remote_file
self._update_interval_secs = update_interval_secs
self._update_interval_bytes = update_interval_mb * 1024 * 1024

self._seen_so_far = 0
self._lock = Lock()
self._last_update_time = time.time()
self._last_update_size_bytes = 0

def _reset(self):
self._seen_so_far = 0
self._lock = Lock()
self._last_update_time = time.time()
self._last_update_size_bytes = 0

def set_file(self, remote_file: RemoteFile):
self._remote_file = remote_file
self._reset()

def __call__(self, bytes_amount):
if self._remote_file is None:
raise Exception("remote_file attribute is None.")
with self._lock:
self._seen_so_far += bytes_amount
current_time = time.time()
if (
current_time - self._last_update_time >= self._update_interval_secs
or self._seen_so_far >= (self._last_update_size_bytes + self._update_interval_bytes)
):
self._print_progress()
self._last_update_time = current_time
self._last_update_size_bytes = self._seen_so_far

def _print_progress(self):
seen_so_far_mb = self._seen_so_far / (1024**2)
percentage = (seen_so_far_mb / self._remote_file.size_mb) * 100
# logger.info( #TODO: fix logging
print(
f"{self._remote_file.remote_path} {percentage:.2f}% ({seen_so_far_mb:.1f}/{self._remote_file.size_mb:.1f} MB)\n",
end="",
flush=True,
)

@staticmethod
def done_downloading_file(remote_file: RemoteFile):
logger.info(
f"Done downloading file: Total Time: {remote_file.download_time:.1f} sec, rate: {remote_file.download_rate_mb_sec:.1f} MB/sec"
)
53 changes: 53 additions & 0 deletions custom_model_runner/datarobot_drum/lazy_loading/remote_file.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#
# Copyright 2024 DataRobot, Inc. and its affiliates.
#
# All rights reserved.
#
# DataRobot, Inc.
#
# This is proprietary source code of DataRobot, Inc. and its
# affiliates.
#
# Released under the terms of DataRobot Tool and Utility Agreement.
# from datarobot_custom_code.utils import calculate_rate


class RemoteFile:
def __init__(self, remote_path, local_path, repository_id):
self.remote_path = remote_path
self.local_path = local_path
self.repository_id = repository_id
self.size_bytes = None
self.download_time = None
self.download_start_time = None
self.download_status = None
self.error_msg = None

@property
def size_mb(self):
if self.size_bytes is None:
return None
return self.size_bytes / 1024 / 1024

@property
def download_rate_mb_sec(self):
if self.download_time > 0:
return (self.size_bytes / self.download_time) / (1024 * 1024)
else:
return None

def __str__(self):
s = f"<RemoteFile {self.remote_path}, local: {self.local_path}, repo: {self.repository_id}"
if self.size_bytes is None:
s += ", size: N/A"
else:
s += f", size: {self.size_bytes:.1f} MB"

if self.download_time is None:
s += ", download_time: N/A"
else:
s += f", download_time: {self.download_time:.1f} seconds"

if self.error_msg is not None:
s += f"\nerror: {self.error_msg}"
return s
Loading