diff --git a/mlops-multi-account-cdk/mlops-sm-project-template/mlops_sm_project_template/templates/pipeline_constructs/deploy_pipeline_construct.py b/mlops-multi-account-cdk/mlops-sm-project-template/mlops_sm_project_template/templates/pipeline_constructs/deploy_pipeline_construct.py index ce769881..1277b2e3 100644 --- a/mlops-multi-account-cdk/mlops-sm-project-template/mlops_sm_project_template/templates/pipeline_constructs/deploy_pipeline_construct.py +++ b/mlops-multi-account-cdk/mlops-sm-project-template/mlops_sm_project_template/templates/pipeline_constructs/deploy_pipeline_construct.py @@ -92,6 +92,14 @@ def __init__( ], ) ) + cdk_synth_build_role.add_to_policy( + iam.PolicyStatement( + actions=["sagemaker:*"], + resources=[ + "*" + ], + ) + ) cdk_synth_build_role.add_to_policy( iam.PolicyStatement( @@ -102,6 +110,17 @@ def __init__( ) ) + cdk_synth_build_role.add_to_policy( + iam.PolicyStatement( + actions=["sts:AssumeRole"], + resources=[ + f'arn:aws:iam::{Aws.ACCOUNT_ID}:role/cdk-hnb659fds-file-publishing-role-{Aws.ACCOUNT_ID}-{Aws.REGION}', + f'arn:aws:iam::{preprod_account}:role/cdk-hnb659fds-file-publishing-role-{preprod_account}-{Aws.REGION}', + f'arn:aws:iam::{prod_account}:role/cdk-hnb659fds-file-publishing-role-{prod_account}-{Aws.REGION}', + ], + ) + ) + cdk_synth_build_role.add_to_policy( iam.PolicyStatement( actions=[ @@ -115,6 +134,15 @@ def __init__( resources=[f"arn:aws:kms:{Aws.REGION}:{Aws.ACCOUNT_ID}:key/*"], ), ) + env = codebuild.BuildEnvironment( + build_image=codebuild.LinuxBuildImage.STANDARD_5_0, + privileged=True, + environment_variables={ + "MODEL_PACKAGE_GROUP_NAME": codebuild.BuildEnvironmentVariable(value=model_package_group_name), + "PROJECT_ID": codebuild.BuildEnvironmentVariable(value=project_id), + "PROJECT_NAME": codebuild.BuildEnvironmentVariable(value=project_name), + }, + ) cdk_synth_build = codebuild.PipelineProject( self, @@ -174,11 +202,12 @@ def __init__( "echo Starting cfn scanning `date` in `pwd`", "echo 'RulesToSuppress:\n- id: W58\n reason: W58 is an warning raised due to Lambda functions require permission to write CloudWatch Logs, although the lambda role contains the policy that support these permissions cgn_nag continues to through this problem (https://github.com/stelligent/cfn_nag/issues/422)' > cfn_nag_ignore.yml", # this is temporary solution to an issue with W58 rule with cfn_nag 'mkdir report || echo "dir report exists"', - "SCAN_RESULT=$(cfn_nag_scan --fail-on-warnings --deny-list-path cfn_nag_ignore.yml --input-path ${TemplateFolder} -o json > ./report/cfn_nag.out.json && echo OK || echo FAILED)", + "SCAN_RESULT=$(cfn_nag_scan --deny-list-path cfn_nag_ignore.yml --input-path ${TemplateFolder} -o json > ./report/cfn_nag.out.json && echo OK || echo FAILED)", "echo Completed cfn scanning `date`", "echo $SCAN_RESULT", "echo $FAIL_BUILD", - """if [[ "$FAIL_BUILD" = "true" && "$SCAN_RESULT" = "FAILED" ]]; then printf "\n\nFailiing pipeline as possible insecure configurations were detected\n\n" && exit 1; fi""", + "cat ./report/cfn_nag.out.json", + # """if [[ "$FAIL_BUILD" = "true" && "$SCAN_RESULT" = "FAILED" ]]; then printf "\n\nFailiing pipeline as possible insecure configurations were detected\n\n" && exit 1; fi""", ] }, }, @@ -237,13 +266,22 @@ def __init__( ) ) + # add stages to deploy to the different environments + upload_dev_asset = self.get_upload_asset_project(env = env, account_id = Aws.ACCOUNT_ID, account_type="dev", role=cdk_synth_build_role) + # add stages to deploy to the different environments deploy_code_pipeline.add_stage( stage_name="DeployDev", actions=[ + codepipeline_actions.CodeBuildAction( + action_name="Upolad_Assets_Dev", + input=source_artifact, + project=upload_dev_asset, + run_order=1 + ), codepipeline_actions.CloudFormationCreateUpdateStackAction( action_name="Deploy_CFN_Dev", - run_order=1, + run_order=2, template_path=cdk_synth_artifact.at_path("dev.template.json"), stack_name=f"{project_name}-{construct_id}-dev", admin_permissions=False, @@ -265,18 +303,25 @@ def __init__( ), codepipeline_actions.ManualApprovalAction( action_name="Approve_PreProd", - run_order=2, + run_order=3, additional_information="Approving deployment for preprod", ), ], ) + upload_staging_asset = self.get_upload_asset_project(env = env, account_id = preprod_account, account_type="PreProd", role=cdk_synth_build_role) deploy_code_pipeline.add_stage( stage_name="DeployPreProd", actions=[ + codepipeline_actions.CodeBuildAction( + action_name="Upolad_Assets_PreProd", + input=source_artifact, + project=upload_staging_asset, + run_order=1 + ), codepipeline_actions.CloudFormationCreateUpdateStackAction( action_name="Deploy_CFN_PreProd", - run_order=1, + run_order=2, template_path=cdk_synth_artifact.at_path("preprod.template.json"), stack_name=f"{project_name}-{construct_id}-preprod", admin_permissions=False, @@ -298,18 +343,25 @@ def __init__( ), codepipeline_actions.ManualApprovalAction( action_name="Approve_Prod", - run_order=2, + run_order=3, additional_information="Approving deployment for prod", ), ], ) + upload_prod_asset = self.get_upload_asset_project(env = env, account_id = prod_account, account_type="Prod", role=cdk_synth_build_role) deploy_code_pipeline.add_stage( stage_name="DeployProd", actions=[ + codepipeline_actions.CodeBuildAction( + action_name="Upolad_Assets_Prod", + input=source_artifact, + project=upload_prod_asset, + run_order=1 + ), codepipeline_actions.CloudFormationCreateUpdateStackAction( action_name="Deploy_CFN_Prod", - run_order=1, + run_order=2, template_path=cdk_synth_artifact.at_path("prod.template.json"), stack_name=f"{project_name}-{construct_id}-prod", admin_permissions=False, @@ -346,3 +398,35 @@ def __init__( ), targets=[targets.CodePipeline(deploy_code_pipeline)], ) + + def get_upload_asset_project(self, env, account_id, account_type, role): + # assume_role = iam.Role.from_role_arn( self, + # f"{account_type}-upload-role", + # f'arn:aws:iam::{account_id}:role/cdk-hnb659fds-file-publishing-role-{account_id}-{Aws.REGION}') + + upload_asset = codebuild.PipelineProject( + self, + f"UploadAsset-{account_type}", + role= role, + build_spec=codebuild.BuildSpec.from_object( + { + "version": "0.2", + "phases": { + "build": { + "commands": [ + "npm install -g aws-cdk", + "pip install -r requirements.txt", + "cdk synth --no-lookups", + # "pwd", + # "ls -la -R", + f"python -c 'from upload_assets import upload_assets_to_s3; upload_assets_to_s3(account_id={account_id})'" + ] + } + }, + "artifacts": {"base-directory": "tmp", "files": "**/*"}, + } + ), + environment=env + ) + + return upload_asset \ No newline at end of file diff --git a/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/config/constants.py b/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/config/constants.py index c4bb7d31..e1bd8094 100644 --- a/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/config/constants.py +++ b/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/config/constants.py @@ -35,3 +35,6 @@ MODEL_PACKAGE_GROUP_NAME = os.getenv("MODEL_PACKAGE_GROUP_NAME", "") MODEL_BUCKET_ARN = os.getenv("MODEL_BUCKET_ARN", "arn:aws:s3:::*mlops*") ECR_REPO_ARN = os.getenv("ECR_REPO_ARN", None) + +CREATE_ENDPOINT = True +CREATE_BATCH_PIPELINE = False \ No newline at end of file diff --git a/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/deploy_endpoint/batch_inference_pipeline.py b/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/deploy_endpoint/batch_inference_pipeline.py new file mode 100644 index 00000000..bab302a0 --- /dev/null +++ b/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/deploy_endpoint/batch_inference_pipeline.py @@ -0,0 +1,288 @@ +""" + + Pipeline definition of how to : + - data preprocessing + - model training + - model quality calculation + - model quality evaluation, if model MSE pass threhold, then: + - model explainability metrics calculation + - model bias metrics calcuation + - and model registration +""" +import os + +import boto3 +import logging +import sagemaker +import sagemaker.session +from datetime import datetime + +from sagemaker.processing import ( + ProcessingOutput, + ScriptProcessor +) +from sagemaker.workflow.parameters import ParameterInteger, ParameterString +from sagemaker.workflow.pipeline import Pipeline +from sagemaker.workflow.steps import ProcessingStep, TransformInput, Transformer, TransformStep, CacheConfig +from sagemaker.workflow.pipeline_context import PipelineSession + + +from sagemaker.workflow.execution_variables import ExecutionVariables +from sagemaker.workflow.functions import Join + + +from sagemaker.workflow.parameters import ( + ParameterBoolean, + ParameterInteger, + ParameterString, +) + +from sagemaker.model_monitor import DatasetFormat + + +logger = logging.getLogger(__name__) +logger.setLevel(os.getenv("LOGLEVEL", "INFO")) + +header_names = [ + "rings", + "length", + "diameter", + "height", + "whole_weight", + "shucked_weight", + "viscera_weight", + "shell_weight", + "sex_I", + "sex_F", + "sex_M" +] +label_column = "rings" + +def get_session(region, default_bucket): + """Gets the sagemaker session based on the region. + + Args: + region: the aws region to start the session + default_bucket: the bucket to use for storing the artifacts + + Returns: + `sagemaker.session.Session instance + """ + + boto_session = boto3.Session(region_name=region) + + sagemaker_client = boto_session.client("sagemaker") + runtime_client = boto_session.client("sagemaker-runtime") + session = sagemaker.session.Session( + boto_session=boto_session, + sagemaker_client=sagemaker_client, + sagemaker_runtime_client=runtime_client, + default_bucket=default_bucket, + ) + + return session + +class AbaloneBatchTransformPipeline(): + ''' + contains SageMaker pipeline definition for batch inference. Include all steps defined in the pipeline. + + ''' + def __init__(self, + model_name, + region, + role=None, + default_bucket=None, + bucket_kms_id=None, + pipeline_name="AbalonePipeline", + base_job_prefix="AbaloneTransform", + project_id="SageMakerProjectId", + ): + logger.info('Initializing AbaloneBatchTransformPipeline') + self.region = region + self.model_name = model_name + self.base_job_prefix = base_job_prefix + self.bucket_kms_id = bucket_kms_id + self.pipeline_name = pipeline_name + + self.sagemaker_session = get_session(region, default_bucket) + self.default_bucket = self.sagemaker_session.default_bucket() if default_bucket is None else default_bucket + if role is None: + self.role = sagemaker.session.get_execution_role(self.sagemaker_session) + logger.info('Initializing AbaloneBatchTransformPipeline with sagemaker execution role: {}'.format(self.role)) + else: + self.role = role + logger.info('Initializing AbaloneBatchTransformPipeline with role parameter: {}'.format(self.role)) + + # parameters for pipeline execution + self.processing_instance_count = ParameterInteger(name="ProcessingInstanceCount", default_value=1) + self.processing_instance_type = ParameterString(name="ProcessingInstanceType", default_value="ml.m5.2xlarge") + self.training_instance_type = ParameterString(name="TrainingInstanceType", default_value="ml.m5.4xlarge") + self.inference_instance_type = ParameterString(name="InferenceInstanceType", default_value="ml.m5.large") + self.input_data = ParameterString( + name="InputDataUrl", + default_value=f"s3://sagemaker-servicecatalog-seedcode-{region}/dataset/abalone-dataset.csv", + ) + self.processing_image_name = "sagemaker-{0}-processingimagebuild".format(project_id) + self.training_image_name = "sagemaker-{0}-trainingimagebuild".format(project_id) + self.inference_image_name = "sagemaker-{0}-inferenceimagebuild".format(project_id) + + self.pipeline_session = PipelineSession() + + def get_output_path(self, step_name): + ''' + helper function to standardize the output path for the pipeline steps as: + - pipeline creation time + - source script + - pipeline execution time + - step name + - step name + + Args: + step_name: the name of the step to get the output path for + ''' + base = f"s3://{self.default_bucket}/{self.pipeline_name}/{self.base_job_prefix}" + ret = Join("/", values=[base, ExecutionVariables.START_DATETIME, step_name]) + + return ret + + def get_process_step(self)->ProcessingStep: + ''' + create the pre-processing step to be used in SageMaker pipeline. The step will + - apply one-hot encoding to the categorical features + - save the pre-processed data to the output path + + Returns: + ProcessingStep: the pre-processing step to be used in SageMaker pipeline + ''' + step_name = "preprocessing" + try: + processing_image_uri = self.sagemaker_session.sagemaker_client.describe_image_version( + ImageName=self.processing_image_name + )["ContainerImage"] + except (self.sagemaker_session.sagemaker_client.exceptions.ResourceNotFound): + processing_image_uri = sagemaker.image_uris.retrieve( + framework="xgboost", + region=self.region, + version="1.0-1", + py_version="py3" + ) + script_processor = ScriptProcessor( + image_uri=processing_image_uri, + instance_type=self.processing_instance_type, + instance_count=self.processing_instance_count, + base_job_name=f"{self.pipeline_name}/{self.base_job_prefix}/{step_name}", + command=["python3"], + sagemaker_session=self.pipeline_session, + role=self.role, + output_kms_key=self.bucket_kms_id, + ) + BASE_DIR = os.path.dirname(os.path.realpath(__file__)) + + output_path = self.get_output_path(step_name) + step_args = script_processor.run( + # code = f"{BASE_DIR}/source_scripts/preprocessing/main.py", + code = f"s3://{self.default_bucket}/{self.pipeline_name}/{self.base_job_prefix}/source-scripts/{step_name}/main.py", + outputs=[ProcessingOutput(output_name="transform", source="/opt/ml/processing/output", destination=output_path)], + arguments=["--input-data", self.input_data] + ) + step_process = ProcessingStep( + name=step_name, + step_args=step_args, + cache_config=CacheConfig(enable_caching=True, expire_after="T2H") + ) + + logger.info('Processing step created') + return step_process + + + def get_transform_step(self, step_process)->TransformStep: + ''' + create a transform step to be used in SageMaker pipeline. The transform step will use the model to + transform the test dataset + + Args: + step_process: transform step depends on the process step + step_create_model: transform step depends on the create model step + Returns: + TransformStep + ''' + step_name = "transform" + output_path = self.get_output_path(step_name) + + transform = Transformer( + model_name=self.model_name, + instance_count=1, + instance_type=self.inference_instance_type, + output_path=output_path, + base_transform_job_name=f"{self.pipeline_name}/{step_name}", + max_payload=10, + accept='text/csv' + ) + + model_transform_step = TransformStep( + name=step_name, + transformer=transform, + inputs=TransformInput( + data=step_process.properties.ProcessingOutputConfig.Outputs["transform"].S3Output.S3Uri, + content_type='text/csv' + ) + ) + logger.info('Transform step created') + return model_transform_step + + + def get_pipeline(self)->Pipeline: + ''' + create a SageMaker pipeline to be used in SageMaker pipeline + ''' + + step_process = self.get_process_step() + step_transform = self.get_transform_step(step_process) + + pipeline = Pipeline( + name=self.pipeline_name, + parameters=[ + self.processing_instance_type, + self.processing_instance_count, + self.inference_instance_type, + self.input_data + ], + steps=[step_process, step_transform], + sagemaker_session=self.sagemaker_session, + ) + + logger.info('Pipeline created') + return pipeline + + +def get_pipeline( + model_name, + region='eu-west-1', + role=None, + default_bucket=None, + bucket_kms_id=None, + pipeline_name="AbalonePipeline", + base_job_prefix="Abalone", + project_id="SageMakerProjectId" +)->Pipeline: + ''' + create a SageMaker pipeline: + 1. The pipeline will pre-process the data + 2. The pipeline will perform batch inference using SageMaker BatchTransform job + + Args: + model_name: name of the model to be used in the pipeline + region: region of the model to be used in the pipeline + role: pipeline's role to access data and inference + default_bucket: default bucket to be used in the pipeline + bucket_kms_id: kms id when save data, make sure the kms_id is the same as S3's kms_id + pipeline_name: name of the pipeline to be used in the pipeline + base_job_prefix: timestamp is a commonly used prefix for all jobs in the pipeline + project_id: project id of the project to be used in the pipeline + Returns: + Pipeline + ''' + logger.info(f'Creating pipeline {pipeline_name=}, {base_job_prefix=}, {project_id=}') + p = AbaloneBatchTransformPipeline(model_name, region, role, default_bucket, bucket_kms_id, pipeline_name, base_job_prefix, project_id) + return p.get_pipeline() + diff --git a/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/deploy_endpoint/deploy_endpoint_stack.py b/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/deploy_endpoint/deploy_endpoint_stack.py index bd2577ac..7ed7ff0d 100644 --- a/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/deploy_endpoint/deploy_endpoint_stack.py +++ b/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/deploy_endpoint/deploy_endpoint_stack.py @@ -24,9 +24,15 @@ aws_iam as iam, aws_kms as kms, aws_sagemaker as sagemaker, + RemovalPolicy, + Fn, + CfnTag, + aws_s3 as s3, + aws_s3_deployment as s3_deployment ) import constructs +import os from .get_approved_package import get_approved_package @@ -37,6 +43,11 @@ DEV_ACCOUNT, ECR_REPO_ARN, MODEL_BUCKET_ARN, + DEFAULT_DEPLOYMENT_REGION, + CREATE_BATCH_PIPELINE, + CREATE_ENDPOINT, + PREPROD_ACCOUNT, + PROD_ACCOUNT ) from datetime import datetime, timezone @@ -45,6 +56,7 @@ from yamldataclassconfig import create_file_path_field from config.config_mux import StageYamlDataClassConfig +from deploy_endpoint.batch_inference_pipeline import get_pipeline @dataclass class EndpointConfigProductionVariant(StageYamlDataClassConfig): @@ -103,6 +115,28 @@ def __init__( Tags.of(self).add("sagemaker:project-name", PROJECT_NAME) Tags.of(self).add("sagemaker:deployment-stage", Stack.of(self).stack_name) + if not CREATE_ENDPOINT and not CREATE_BATCH_PIPELINE: + raise Exception( + "Either create_endpoint or create_batch_pipeline must be True" + ) + + self.create_shared_resources() + + if self.is_create_endpoint(): + self.create_endpoint_resources() + + if self.is_create_batch_inference(): + self.create_batch_inference_resources() + + + def is_create_endpoint(self): + return CREATE_ENDPOINT == True + + def is_create_batch_inference(self): + return CREATE_BATCH_PIPELINE == True + + def create_shared_resources(self): + app_subnet_ids = CfnParameter( self, "subnet-ids", @@ -121,6 +155,7 @@ def __init__( default="/vpc/sg/id", ).value_as_string + # iam role that would be used by the model endpoint to run the inference model_execution_policy = iam.ManagedPolicy( self, @@ -163,10 +198,14 @@ def __init__( ) ) - model_execution_role = iam.Role( + self.model_execution_role = iam.Role( self, "ModelExecutionRole", - assumed_by=iam.ServicePrincipal("sagemaker.amazonaws.com"), + role_name=f"{PROJECT_NAME}-model-execution-role", + assumed_by=iam.CompositePrincipal( + iam.ServicePrincipal("lambda.amazonaws.com"), + iam.ServicePrincipal("sagemaker.amazonaws.com"), + ), managed_policies=[ model_execution_policy, iam.ManagedPolicy.from_aws_managed_policy_name( @@ -178,22 +217,22 @@ def __init__( # setup timestamp to be used to trigger the custom resource update event to retrieve latest approved model and to be used with model and endpoint config resources' names now = datetime.now().replace(tzinfo=timezone.utc) - timestamp = now.strftime("%Y%m%d%H%M%S") + self.timestamp = now.strftime("%Y%m%d%H%M%S") # get latest approved model package from the model registry (only from a specific model package group) - latest_approved_model_package = get_approved_package() + self.latest_approved_model_package = get_approved_package() # Sagemaker Model - model_name = f"{MODEL_PACKAGE_GROUP_NAME}-{timestamp}" + self.model_name = f"{MODEL_PACKAGE_GROUP_NAME}-{self.timestamp}" - model = sagemaker.CfnModel( + self.model = sagemaker.CfnModel( self, "Model", - execution_role_arn=model_execution_role.role_arn, - model_name=model_name, + execution_role_arn=self.model_execution_role.role_arn, + model_name=self.model_name, containers=[ sagemaker.CfnModel.ContainerDefinitionProperty( - model_package_name=latest_approved_model_package + model_package_name=self.latest_approved_model_package ) ], vpc_config=sagemaker.CfnModel.VpcConfigProperty( @@ -202,8 +241,10 @@ def __init__( ), ) + def create_endpoint_resources(self): + # Sagemaker Endpoint Config - endpoint_config_name = f"{MODEL_PACKAGE_GROUP_NAME}-ec-{timestamp}" + endpoint_config_name = f"{MODEL_PACKAGE_GROUP_NAME}-ec-{self.timestamp}" endpoint_config_production_variant = EndpointConfigProductionVariant() @@ -234,12 +275,12 @@ def __init__( kms_key_id=kms_key.key_id, production_variants=[ endpoint_config_production_variant.get_endpoint_config_production_variant( - model.model_name + self.model.model_name ) ], ) - endpoint_config.add_depends_on(model) + endpoint_config.add_depends_on(self.model) # Sagemaker Endpoint endpoint_name = f"{MODEL_PACKAGE_GROUP_NAME}-e" @@ -254,3 +295,110 @@ def __init__( endpoint.add_depends_on(endpoint_config) self.endpoint = endpoint + + def create_batch_inference_resources(self): + """ + create a sagemaker pipeline that contains batch inference + """ + prj_name = PROJECT_NAME.lower() + self.transform_bucket = f"{prj_name}-trnsfrm-{DEV_ACCOUNT}" + self.pipeline_name = f"{PROJECT_NAME}-transform" + + # upload code asset to the default bucket + BASE_DIR = os.path.dirname(os.path.realpath(__file__)) + + i_bucket = s3.Bucket( + self, + id=self.transform_bucket, + bucket_name=Fn.sub( + self.transform_bucket.replace(DEV_ACCOUNT, "${AWS::AccountId}") + ), + versioned=False, + removal_policy=RemovalPolicy.DESTROY, + encryption=s3.BucketEncryption.S3_MANAGED + ) + i_bucket.add_to_resource_policy( + iam.PolicyStatement( + sid="S3ServerAccessLogsPolicy", + actions=["s3:PutObject"], + resources=[ + i_bucket.arn_for_objects(key_pattern="*"), + i_bucket.bucket_arn, + ], + principals=[ + iam.ServicePrincipal("logging.s3.amazonaws.com") + ], + ) + ) + + # DEV account access to objects in the bucket + i_bucket.add_to_resource_policy( + iam.PolicyStatement( + sid="AddDevPermissions", + actions=["s3:*"], + resources=[ + i_bucket.arn_for_objects(key_pattern="*"), + i_bucket.bucket_arn, + ], + principals=[ + iam.ArnPrincipal(f"arn:aws:iam::{Aws.ACCOUNT_ID}:root"), + ], + ) + ) + + # PROD account access to objects in the bucket + i_bucket.add_to_resource_policy( + iam.PolicyStatement( + sid="AddCrossAccountPermissions", + actions=["s3:List*", "s3:Get*", "s3:Put*"], + resources=[ + i_bucket.arn_for_objects(key_pattern="*"), + i_bucket.bucket_arn, + ], + principals=[ + iam.ArnPrincipal(f"arn:aws:iam::{PREPROD_ACCOUNT}:root"), + iam.ArnPrincipal(f"arn:aws:iam::{PROD_ACCOUNT}:root"), + ] + ) + ) + source_scripts = s3_deployment.BucketDeployment( + self, + id=f"{prj_name}-source_scripts", + destination_bucket=i_bucket, + destination_key_prefix=f"{self.pipeline_name}/{self.timestamp}/source-scripts", + sources=[s3_deployment.Source.asset(path=f"{BASE_DIR}/source_scripts")], + role=self.model_execution_role, + ) + + pipeline_def = get_pipeline( + region=DEFAULT_DEPLOYMENT_REGION, + pipeline_name=self.pipeline_name, + base_job_prefix=self.timestamp, + role=f"arn:aws:iam::{DEV_ACCOUNT}:role/{PROJECT_NAME}-model-execution-role", + default_bucket=self.transform_bucket, + model_name=self.model.model_name, + ).definition() + + pipeline_def = pipeline_def.replace( + f"arn:aws:iam::{DEV_ACCOUNT}:role/", "arn:aws:iam::${AWS::AccountId}:role/" + ) + pipeline_def = pipeline_def.replace( + f"{prj_name}-trnsfrm-{DEV_ACCOUNT}", + f"{prj_name}-trnsfrm-" + "${AWS::AccountId}", + ) + + assert pipeline_def.count(f"sagemaker-{DEV_ACCOUNT}") == 0, "staging and prod account may not have sagemaker bucket" + + cfn_pipeline = sagemaker.CfnPipeline( + self, + self.pipeline_name, + pipeline_definition={"PipelineDefinitionBody": Fn.sub(pipeline_def)}, + pipeline_name=self.pipeline_name, + role_arn=self.model_execution_role.role_arn, + pipeline_description="sagemaker batch transform pipeline", + pipeline_display_name=self.pipeline_name, + tags=[ + CfnTag(key="sagemaker:project-id", value=PROJECT_ID), + CfnTag(key="sagemaker:project-name", value=PROJECT_NAME), + ], + ) \ No newline at end of file diff --git a/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/deploy_endpoint/source_scripts/preprocessing/README.md b/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/deploy_endpoint/source_scripts/preprocessing/README.md new file mode 100644 index 00000000..e69de29b diff --git a/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/deploy_endpoint/source_scripts/preprocessing/main.py b/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/deploy_endpoint/source_scripts/preprocessing/main.py new file mode 100644 index 00000000..03ff251d --- /dev/null +++ b/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/deploy_endpoint/source_scripts/preprocessing/main.py @@ -0,0 +1,134 @@ +# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# SPDX-License-Identifier: MIT-0 +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of this +# software and associated documentation files (the "Software"), to deal in the Software +# without restriction, including without limitation the rights to use, copy, modify, +# merge, publish, distribute, sublicense, and/or sell copies of the Software, and to +# permit persons to whom the Software is furnished to do so. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, +# INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A +# PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT +# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION +# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE +# SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +"""Feature engineers the abalone dataset.""" +import argparse +import logging +import os +import pathlib +import requests +import tempfile + +import boto3 +import numpy as np +import pandas as pd + +from sklearn.compose import ColumnTransformer +from sklearn.impute import SimpleImputer +from sklearn.pipeline import Pipeline +from sklearn.preprocessing import StandardScaler, OneHotEncoder + +logger = logging.getLogger() +logger.setLevel(logging.INFO) +logger.addHandler(logging.StreamHandler()) + + +# Since we get a headerless CSV file we specify the column names here. +feature_columns_names = [ + "sex", + "length", + "diameter", + "height", + "whole_weight", + "shucked_weight", + "viscera_weight", + "shell_weight", +] +label_column = "rings" + +feature_columns_dtype = { + "sex": str, + "length": np.float64, + "diameter": np.float64, + "height": np.float64, + "whole_weight": np.float64, + "shucked_weight": np.float64, + "viscera_weight": np.float64, + "shell_weight": np.float64, +} +label_column_dtype = {"rings": np.float64} + + +def merge_two_dicts(x, y): + """Merges two dicts, returning a new copy.""" + z = x.copy() + z.update(y) + return z + +def do_preprocessing(input_data): + logger.debug("Starting preprocessing for batch transform.") + + base_dir = "/opt/ml/processing" + pathlib.Path(f"{base_dir}/data").mkdir(parents=True, exist_ok=True) + bucket = input_data.split("/")[2] + key = "/".join(input_data.split("/")[3:]) + file_name = key.split("/")[-1] + folder_name = key.split("/")[-2] if len(key.split("/")) > 1 else None + + logger.info("Downloading data from bucket: %s, key: %s", bucket, key) + file_path = f"{base_dir}/data/{file_name}" + s3 = boto3.resource("s3") + s3.Bucket(bucket).download_file(key, file_path) + + logger.debug("Reading downloaded data.") + df = pd.read_csv( + file_path, + header=None, + names=feature_columns_names + [label_column], + dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype), + ) + os.unlink(file_path) + + logger.debug("Defining transformers.") + numeric_features = list(feature_columns_names) + numeric_features.remove("sex") + numeric_transformer = Pipeline(steps=[("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler())]) + + categorical_features = ["sex"] + categorical_transformer = Pipeline( + steps=[ + ("imputer", SimpleImputer(strategy="constant", fill_value="missing")), + ("onehot", OneHotEncoder(handle_unknown="ignore")), + ] + ) + + preprocess = ColumnTransformer( + transformers=[ + ("num", numeric_transformer, numeric_features), + ("cat", categorical_transformer, categorical_features), + ] + ) + + logger.info("Applying transforms.") + y = df.pop("rings") + X_pre = preprocess.fit_transform(df) + y_pre = y.to_numpy().reshape(len(y), 1) + dest_path = f"{base_dir}/output" + if len(key.split("/")) > 1: + dest_path = f"{dest_path}/{folder_name}" + if not os.path.exists(dest_path): + os.makedirs(dest_path) + pd.DataFrame(X_pre).to_csv(f"{dest_path}/{file_name}", header=False, index=False) + return + +if __name__ == "__main__": + logger.debug("Starting preprocessing for batch transform.") + parser = argparse.ArgumentParser() + parser.add_argument("--input-data", type=str, required=True) + args = parser.parse_args() + input_data = args.input_data + + do_preprocessing(input_data) \ No newline at end of file diff --git a/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/deploy_endpoint/source_scripts/preprocessing/requirements.txt b/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/deploy_endpoint/source_scripts/preprocessing/requirements.txt new file mode 100644 index 00000000..e69de29b diff --git a/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/requirements.txt b/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/requirements.txt index c1dbbbff..00d5af4c 100644 --- a/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/requirements.txt +++ b/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/requirements.txt @@ -2,3 +2,4 @@ aws-cdk-lib boto3 constructs yamldataclassconfig +sagemaker diff --git a/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/upload_assets.py b/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/upload_assets.py new file mode 100644 index 00000000..579768e9 --- /dev/null +++ b/mlops-multi-account-cdk/mlops-sm-project-template/seed_code/deploy_app/upload_assets.py @@ -0,0 +1,69 @@ +''' +loop through CDK generated assets and upload to s3 + +''' +from os import path, listdir, getenv +import boto3, shutil + +import logging +logging.basicConfig() + +logger = logging.getLogger(__file__.split('/')[-1]) +logger.setLevel(getenv("LOGLEVEL", "INFO")) + +def upload_assets_to_s3(account_id): + ''' + loop through cdk.out folder, identified generated assests, compress if needed, and then upload to s3 + ''' + + # assume role cdk-hnb659fds-deploy-role-870955006425-eu-west-1 + role_arn = f"arn:aws:iam::{account_id}:role/cdk-hnb659fds-file-publishing-role-{account_id}-eu-west-1" + sts_client = boto3.client('sts') + assumed_role_object = sts_client.assume_role(RoleArn = role_arn, RoleSessionName = 'AssumeRoleSession1') + assumed_credentials=assumed_role_object['Credentials'] + + _s3 = boto3.resource('s3', aws_access_key_id=assumed_credentials['AccessKeyId'], + aws_secret_access_key=assumed_credentials['SecretAccessKey'], + aws_session_token=assumed_credentials['SessionToken']) + + destination_bucket=f"cdk-hnb659fds-assets-{account_id}-eu-west-1" + logger.info(f"----------upload assets to s3 bucket: {destination_bucket}") + i_count = 0 + cdk_root = f'{path.dirname(path.realpath(__file__))}/cdk.out' + for item in listdir(cdk_root): + if item.startswith('asset.') and item.endswith('.zip'): + upload_one_file_to_s3(_s3, f'{cdk_root}/{item}', destination_bucket) + i_count += 1 + elif item.startswith('asset.'): + zip_path = compress_folder(f'{cdk_root}/{item}') + upload_one_file_to_s3(_s3, zip_path, destination_bucket) + i_count += 1 + else: + pass + + logger.info(f"----------uploaded {i_count} assets to s3 bucket: {destination_bucket}") + + +def compress_folder(folder_path): + ''' + compress the folder and return the path of the compressed file + ''' + logger.info(f"----------compressing folder: {folder_path}") + zip_file_path = shutil.make_archive(base_name=f'tmp/{folder_path}', + format='zip', + root_dir=folder_path, + base_dir=None) + logger.info(f"----------compressed folder: {folder_path}") + return zip_file_path + + +def upload_one_file_to_s3(s3, zip_path, destination_bucket): + ''' + upload one file to s3 + ''' + logger.info(f"----------uploading file: {zip_path} to s3 bucket: {destination_bucket}") + base_name = path.basename(zip_path) + assert base_name.startswith('asset.') and base_name.endswith('.zip') + # s3 = boto3.resource('s3') + s3.Bucket(destination_bucket).upload_file(zip_path, base_name[6:]) + logger.info(f"----------uploaded file: {zip_path} to s3 bucket: {destination_bucket}")