-
Notifications
You must be signed in to change notification settings - Fork 25.4k
[ML] Flag updates from Inference #131725
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[ML] Flag updates from Inference #131725
Changes from all commits
b124b8f
061cc5e
9ddaec4
b97a4ca
0107a7c
00f7ba7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,11 +27,17 @@ | |
import java.io.IOException; | ||
import java.util.Objects; | ||
|
||
import static org.elasticsearch.TransportVersions.INFERENCE_UPDATE_ML; | ||
import static org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction.Request.ADAPTIVE_ALLOCATIONS; | ||
import static org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction.Request.MODEL_ID; | ||
import static org.elasticsearch.xpack.core.ml.action.StartTrainedModelDeploymentAction.Request.NUMBER_OF_ALLOCATIONS; | ||
|
||
public class UpdateTrainedModelDeploymentAction extends ActionType<CreateTrainedModelAssignmentAction.Response> { | ||
public enum Source { | ||
API, | ||
ADAPTIVE_ALLOCATIONS, | ||
INFERENCE | ||
} | ||
|
||
public static final UpdateTrainedModelDeploymentAction INSTANCE = new UpdateTrainedModelDeploymentAction(); | ||
public static final String NAME = "cluster:admin/xpack/ml/trained_models/deployment/update"; | ||
|
@@ -73,7 +79,7 @@ public static Request parseRequest(String deploymentId, XContentParser parser) { | |
private String deploymentId; | ||
private Integer numberOfAllocations; | ||
private AdaptiveAllocationsSettings adaptiveAllocationsSettings; | ||
private boolean isInternal; | ||
private Source source = Source.API; | ||
|
||
private Request() { | ||
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, DEFAULT_ACK_TIMEOUT); | ||
|
@@ -90,11 +96,17 @@ public Request(StreamInput in) throws IOException { | |
if (in.getTransportVersion().before(TransportVersions.V_8_16_0)) { | ||
numberOfAllocations = in.readVInt(); | ||
adaptiveAllocationsSettings = null; | ||
isInternal = false; | ||
source = Source.API; | ||
} else { | ||
numberOfAllocations = in.readOptionalVInt(); | ||
adaptiveAllocationsSettings = in.readOptionalWriteable(AdaptiveAllocationsSettings::new); | ||
isInternal = in.readBoolean(); | ||
if (in.getTransportVersion().before(INFERENCE_UPDATE_ML)) { | ||
// we changed over from a boolean to an enum | ||
// when it was a boolean, true came from adaptive allocations and false came from the rest api | ||
source = in.readBoolean() ? Source.ADAPTIVE_ALLOCATIONS : Source.API; | ||
} else { | ||
source = in.readEnum(Source.class); | ||
} | ||
} | ||
} | ||
|
||
|
@@ -119,11 +131,15 @@ public void setAdaptiveAllocationsSettings(AdaptiveAllocationsSettings adaptiveA | |
} | ||
|
||
public boolean isInternal() { | ||
return isInternal; | ||
return source == Source.INFERENCE || source == Source.ADAPTIVE_ALLOCATIONS; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you confirm that we do want There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Confirmed! Yeah inference update code previously set |
||
} | ||
|
||
public void setIsInternal(boolean isInternal) { | ||
this.isInternal = isInternal; | ||
public void setSource(Source source) { | ||
this.source = source != null ? source : this.source; | ||
} | ||
|
||
public Source getSource() { | ||
return source; | ||
} | ||
|
||
public AdaptiveAllocationsSettings getAdaptiveAllocationsSettings() { | ||
|
@@ -139,7 +155,14 @@ public void writeTo(StreamOutput out) throws IOException { | |
} else { | ||
out.writeOptionalVInt(numberOfAllocations); | ||
out.writeOptionalWriteable(adaptiveAllocationsSettings); | ||
out.writeBoolean(isInternal); | ||
if (out.getTransportVersion().before(INFERENCE_UPDATE_ML)) { | ||
// we changed over from a boolean to an enum | ||
// when it was a boolean, true came from adaptive allocations and false came from the rest api | ||
// treat "inference" as if it came from the api | ||
out.writeBoolean(isInternal()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to determine if There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Previously, we set the boolean to true if the source was either from the inference update api or the adaptive allocations autoscaler. |
||
} else { | ||
out.writeEnum(source); | ||
} | ||
} | ||
} | ||
|
||
|
@@ -161,10 +184,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws | |
public ActionRequestValidationException validate() { | ||
ActionRequestValidationException validationException = new ActionRequestValidationException(); | ||
if (numberOfAllocations != null) { | ||
if (numberOfAllocations < 0 || (isInternal == false && numberOfAllocations == 0)) { | ||
if (numberOfAllocations < 0 || (isInternal() == false && numberOfAllocations == 0)) { | ||
validationException.addValidationError("[" + NUMBER_OF_ALLOCATIONS + "] must be a positive integer"); | ||
} | ||
if (isInternal == false | ||
if (isInternal() == false | ||
&& adaptiveAllocationsSettings != null | ||
&& adaptiveAllocationsSettings.getEnabled() == Boolean.TRUE) { | ||
validationException.addValidationError( | ||
|
@@ -183,7 +206,7 @@ public ActionRequestValidationException validate() { | |
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(deploymentId, numberOfAllocations, adaptiveAllocationsSettings, isInternal); | ||
return Objects.hash(deploymentId, numberOfAllocations, adaptiveAllocationsSettings, source); | ||
} | ||
|
||
@Override | ||
|
@@ -198,7 +221,7 @@ public boolean equals(Object obj) { | |
return Objects.equals(deploymentId, other.deploymentId) | ||
&& Objects.equals(numberOfAllocations, other.numberOfAllocations) | ||
&& Objects.equals(adaptiveAllocationsSettings, other.adaptiveAllocationsSettings) | ||
&& isInternal == other.isInternal; | ||
&& source == other.source; | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm missing a bit of context: why do we need to distinguish between these cases?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a corresponding Serverless PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, let me ping you with the internal documentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to allow updates to
num_allocations
in serverless that originate from theAdaptiveAllocationsScalerService
(ADAPTIVE_ALLOCATIONS
), but we want to disallow updates from users (API
andINFERENCE
). The only alternative I thought of was refactoringAdaptiveAllocationsScalerService
to update directly rather than through the API, but that felt more intrusive.