Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
a9bbe99
feat(rabbitmq): add enums, IRabbitMQDestination, plugin annotation, s…
tjwald Apr 30, 2026
742ab42
feat(rabbitmq): add VirtualHost, Queue, Exchange, Shovel resources an…
tjwald Apr 30, 2026
9d76dac
feat(rabbitmq): add builder extensions for VirtualHost, Queue, Exchan…
tjwald Apr 30, 2026
12b3be2
test(rabbitmq): add unit tests for child resources, provisioner, plug…
tjwald Apr 30, 2026
c3afc79
docs(rabbitmq): update README with child resources and plugin example…
tjwald Apr 30, 2026
943a901
green tests
tjwald May 1, 2026
207b416
feat(rabbitmq): add shovelName wire-name parameter to AddShovel
tjwald May 2, 2026
47704e2
feat(rabbitmq): upgrade shovel endpoint URI to ReferenceExpression?
tjwald May 2, 2026
ed3356c
feat(rabbitmq): move public types to Aspire.Hosting.ApplicationModel …
tjwald May 2, 2026
849c935
RabbitMQ: per-resource health check model with ProvisioningComplete TCS
tjwald May 2, 2026
81dcd65
RabbitMQ: add health check docs (spec + README section)
tjwald May 2, 2026
a6971ba
feat(rabbitmq): add policy support
tjwald May 2, 2026
0efe710
refactor(rabbitmq): CR fixes — fluent chains, polymorphic bindings, R…
tjwald May 3, 2026
b09f7f5
fix(rabbitmq): address Copilot review on #16705
tjwald May 3, 2026
9d97de8
updated model to fix arguments issue and related design issue. Reduce…
tjwald May 8, 2026
c22a22f
refactor(rabbitmq): resources own provisioning signal and lifecycle s…
tjwald May 8, 2026
ed650fd
refactor(rabbitmq): address PR #16705 review feedback and improve XML…
tjwald May 8, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions docs/specs/rabbitmq-health-checks.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# RabbitMQ child-resource health check design

This document captures the design intent and key decisions for how health checks work across
`Aspire.Hosting.RabbitMQ` child resources. It is aimed at contributors extending the integration.

For the user-facing contract see the [README](../../src/Aspire.Hosting.RabbitMQ/README.md#health-checks).

## Guiding principle

A child resource is `Healthy` iff it has been provisioned **exactly as declared** in the AppHost,
including every cross-cutting configuration that affects its runtime behaviour, and a live probe
confirms it still exists on the broker.

"Exactly as declared" is the key phrase. A queue with a TTL policy that failed to apply is not
what the user declared — it must be `Unhealthy` so that `WaitFor(queue)` blocks dependents.

## Design decisions

### Two signals per resource: lifecycle and health

Every RabbitMQ child resource participates in two independent signaling channels:

**Lifecycle signal** (Aspire resource state — `Starting` / `Running` / `FailedToStart`):
Reflects whether the provisioning attempt has been made and what its outcome was.
The resource transitions to `Running` once the broker call completes successfully.
If the broker call fails, the resource transitions to `FailedToStart`.
Resources that have not yet been reached by the provisioner remain in `Starting`.

**Health signal** (`ProvisionedTask` — a read-only `Task`):
A gate the health check awaits. Pending means provisioning has not completed yet (health check
returns `Degraded`). Completed means provisioning succeeded and the live probe can run.
Faulted means provisioning failed (health check returns `Unhealthy`).

These two channels are intentionally separate: lifecycle state is visible in the Aspire dashboard
resource list; health state is visible in the health check panel and gates `WaitFor` dependents.

### Resource owns both signals

Each resource owns its provisioning signal (`ProvisionedTask`) and is responsible for publishing
its own lifecycle state transitions. The provisioner calls `ApplyAsync` (and for exchanges,
`ApplyBindingsAsync`) and the resource handles everything else internally.

The `TaskCompletionSource` is `private readonly` inside each resource. The interface exposes only
the read side: `Task ProvisionedTask { get; }`. This prevents the provisioner from signaling
arbitrary states independently of the actual broker call result.

`ApplyAsync` receives a `ResourceNotificationService` parameter so the resource can publish
`Starting` at entry and `Running` or `FailedToStart` at exit without any external coordination.

### Per-resource provisioning signal

Each resource owns a `Task ProvisionedTask` that is completed (or faulted) when its own
provisioning step finishes. This isolates failures: if one queue fails to declare, only that
queue's health check reports `Unhealthy`. Sibling queues, exchanges, and shovels are unaffected.

When a vhost fails to create, the provisioner returns early and child resources are never reached.
Children remain in `Starting` with `ProvisionedTask` still pending — the health check returns
`Degraded` ("provisioning in progress"), which is semantically correct: provisioning never ran.
There is no cascade-fault of children; `FailedToStart` is reserved for resources whose own
provisioning attempt was made and failed.

### Lifecycle and health state matrix

| Situation | Lifecycle state | Health check result |
|---|---|---|
| Provisioner has not reached this resource yet | `Starting` | `Degraded` |
| Provisioning succeeded | `Running` | `Degraded` → `Healthy` after live probe |
| Provisioning failed | `FailedToStart` | `Unhealthy` |
| Exchange declared; bindings in progress | `Running` | `Degraded` |
| Exchange declared; bindings failed | `Running` | `Unhealthy` |

### Exchange is a special case: two-phase provisioning

Exchanges are declared in phase 2 and have their bindings applied in phase 3. The exchange
transitions to `Running` after successful declaration (it is live on the broker at that point).
`ProvisionedTask` is not completed until bindings also succeed. If bindings fail, the exchange
stays `Running` but `ProvisionedTask` faults, so the health check reports `Unhealthy`.

If declaration itself fails, the exchange transitions to `FailedToStart` and `ProvisionedTask`
faults immediately. The provisioner skips phase 3 for that exchange by checking
`exchange.ProvisionedTask.IsFaulted`.

### Two-stage health check: provisioning signal + live probe

The provisioning signal proves "we sent the declare and the broker accepted it." The live probe
proves "the entity still exists" — catching out-of-band deletion by an operator. Both stages are
required for correctness.

### Resource owns its own health semantics

Each resource type knows how to verify itself (existence check, connection check, state check).
This keeps health-check registration in the builder extensions trivial and uniform — every `Add*`
call site uses the same one-liner helper with no per-resource parameters.

### Probe result type is separate from `HealthCheckResult`

Resource classes return a lightweight domain type rather than `HealthCheckResult` directly. This
keeps `Microsoft.Extensions.Diagnostics.HealthChecks` out of the resource model layer, which is
important for testability and layering.

### Binding failures are attributed to the source exchange only

Bindings are declared on the exchange; routing is the exchange's responsibility. The destination
queue's own behaviour is unaffected by a missing binding. Propagating to the destination would
fan-in failures from many exchanges onto one queue and obscure the root cause.

### Shovel failures are isolated to the shovel resource

Shovels move messages between otherwise-independent endpoints. If a shovel fails, the source queue
still exists and is correctly configured. The shovel's live-state probe naturally catches downstream
breakage without needing to cascade to source or destination.

### Policy failures cascade to matching queues/exchanges

Unlike bindings, a policy changes the behaviour of the entity itself (TTL, max-length, DLX, HA).
A queue without its declared TTL policy will silently retain messages forever — a correctness bug
the user cannot observe from "queue exists = true". Therefore a policy failure marks every
queue/exchange whose name matches the policy pattern as `Unhealthy`.

Policy-to-entity matching is resolved once after the model is fully built (not at `AddPolicy` call
time, to avoid order-dependency) and cached on each entity via `AppliedPolicies`. The same
resolution pass adds a dashboard relationship edge so the cascade is visible without reading logs.
This behaviour is implemented and covered by tests.

## Extension guidance

When adding a new provisionable resource type:

- Keep the `TaskCompletionSource` `private readonly`; expose only `Task ProvisionedTask { get; }`.
- In `ApplyAsync`, publish `Starting` at entry, then do the broker work.
On success: complete the TCS and publish `Running`.
On failure: fault the TCS and publish `FailedToStart`.
- Implement a live probe appropriate to the entity type (existence, state, connectivity).
- Declare health dependencies if the resource's correctness depends on other provisionables
(e.g. policies applied to it).
- Register the health check using the shared helper — no bespoke registration logic.
- Add the resource to the appropriate provisioner phase; capture failures per-entity without
short-circuiting siblings. The provisioner must not touch the TCS directly.
4 changes: 4 additions & 0 deletions src/Aspire.Hosting.RabbitMQ/Aspire.Hosting.RabbitMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,8 @@
<ProjectReference Include="..\Aspire.Hosting\Aspire.Hosting.csproj" />
</ItemGroup>

<ItemGroup>
<InternalsVisibleTo Include="Aspire.Hosting.RabbitMQ.Tests" />
</ItemGroup>

</Project>
20 changes: 20 additions & 0 deletions src/Aspire.Hosting.RabbitMQ/IRabbitMQServerChild.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

namespace Aspire.Hosting.ApplicationModel;

/// <summary>
/// Marker interface for resources that are children of a RabbitMQ server, reachable via a virtual host.
/// </summary>
/// <remarks>
/// Implemented by <see cref="RabbitMQVirtualHostResource"/> (returns <c>this</c>),
/// <see cref="RabbitMQDestination"/> (queues and exchanges, returns the parent virtual host),
/// <see cref="RabbitMQShovelResource"/>, and <see cref="RabbitMQPolicyResource"/>.
/// Used internally to derive the server name for health-check registration without requiring
/// it to be passed explicitly at every call site.
/// </remarks>
internal interface IRabbitMQServerChild
{
/// <summary>Gets the virtual host that owns this resource.</summary>
RabbitMQVirtualHostResource VirtualHost { get; }
}
21 changes: 21 additions & 0 deletions src/Aspire.Hosting.RabbitMQ/IResourceWithExchangeArguments.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

namespace Aspire.Hosting.ApplicationModel;

/// <summary>
/// Indicates that a RabbitMQ resource exposes exchange-specific arguments such as the alternate exchange
/// for unroutable messages.
/// </summary>
/// <remarks>
/// Implemented by <see cref="RabbitMQExchangeResource"/> and <see cref="RabbitMQPolicyResource"/>.
/// Use <see cref="RabbitMQBuilderExtensions.WithExchangeArguments{T}"/> or
/// <see cref="RabbitMQBuilderExtensions.WithAlternateExchange{T}"/> to configure these settings.
/// </remarks>
public interface IResourceWithExchangeArguments : IResource
{
/// <summary>
/// Gets the exchange arguments for this resource.
/// </summary>
RabbitMQExchangeArguments ExchangeArguments { get; }
}
21 changes: 21 additions & 0 deletions src/Aspire.Hosting.RabbitMQ/IResourceWithQueueArguments.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

namespace Aspire.Hosting.ApplicationModel;

/// <summary>
/// Indicates that a RabbitMQ resource exposes queue-specific arguments such as message TTL,
/// length limits, and dead-letter routing.
/// </summary>
/// <remarks>
/// Implemented by <see cref="RabbitMQQueueResource"/> and <see cref="RabbitMQPolicyResource"/>.
/// Use <see cref="RabbitMQBuilderExtensions.WithQueueArguments{T}"/> or
/// <see cref="RabbitMQBuilderExtensions.WithDeadLetterExchange{T}"/> to configure these settings.
/// </remarks>
public interface IResourceWithQueueArguments : IResource
{
/// <summary>
/// Gets the queue arguments for this resource.
/// </summary>
RabbitMQQueueArguments QueueArguments { get; }
}
38 changes: 38 additions & 0 deletions src/Aspire.Hosting.RabbitMQ/Provisioning/IRabbitMQProvisionable.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Aspire.Hosting.ApplicationModel;

namespace Aspire.Hosting.RabbitMQ.Provisioning;

/// <summary>
/// Implemented by RabbitMQ resources that can be provisioned against a live broker and verify their own health.
/// </summary>
internal interface IRabbitMQProvisionable
{
/// <summary>Gets the resource name, used in health-check error messages.</summary>
string Name { get; }

/// <summary>
/// Completes when this resource has been fully provisioned; faulted if provisioning failed.
/// </summary>
Task ProvisionedTask { get; }

/// <summary>
/// Applies this resource to the broker. Implementations must not throw; all failures are captured in <see cref="ProvisionedTask"/>.
/// </summary>
Task ApplyAsync(IRabbitMQProvisioningClient client, ResourceNotificationService notifications, ResourceLoggerService resourceLogger, CancellationToken cancellationToken);

/// <summary>
/// Returns the set of other provisionable resources that must complete successfully before this resource's health check reports <c>Healthy</c>.
/// Defaults to an empty sequence.
/// </summary>
IEnumerable<IRabbitMQProvisionable> HealthDependencies => [];

/// <summary>
/// Performs a live broker probe to verify that this resource exists and is in the expected state.
/// Defaults to <see cref="RabbitMQProbeResult.Healthy"/> (no probe needed).
/// </summary>
ValueTask<RabbitMQProbeResult> ProbeAsync(IRabbitMQProvisioningClient client, CancellationToken cancellationToken)
=> ValueTask.FromResult(RabbitMQProbeResult.Healthy);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

namespace Aspire.Hosting.RabbitMQ.Provisioning;

internal interface IRabbitMQProvisioningClient : IAsyncDisposable
{
Task<bool> CanConnectAsync(string vhost, CancellationToken ct);

// AMQP
Task DeclareExchangeAsync(string vhost, string name, string type, bool durable, bool autoDelete, IDictionary<string, object?>? args, CancellationToken ct);
Task DeclareQueueAsync(string vhost, string name, bool durable, bool exclusive, bool autoDelete, IDictionary<string, object?>? args, CancellationToken ct);
Task BindQueueAsync(string vhost, string sourceExchange, string queue, string routingKey, IDictionary<string, object?>? args, CancellationToken ct);
Task BindExchangeAsync(string vhost, string sourceExchange, string destExchange, string routingKey, IDictionary<string, object?>? args, CancellationToken ct);
Task<bool> QueueExistsAsync(string vhost, string name, CancellationToken ct);
Task<bool> ExchangeExistsAsync(string vhost, string name, CancellationToken ct);

// Management HTTP
Task CreateVirtualHostAsync(string vhost, CancellationToken ct);
Task PutShovelAsync(string vhost, string name, RabbitMQShovelDefinition def, CancellationToken ct);
Task<string?> GetShovelStateAsync(string vhost, string name, CancellationToken ct);
Task PutPolicyAsync(string vhost, string name, RabbitMQPolicyDefinition def, CancellationToken ct);
Task<bool> PolicyExistsAsync(string vhost, string name, CancellationToken ct);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Text.Json.Serialization;

namespace Aspire.Hosting.RabbitMQ.Provisioning;

/// <summary>
/// JSON payload for <c>PUT /api/policies/{vhost}/{name}</c>.
/// </summary>
internal sealed record RabbitMQPolicyDefinition(
[property: JsonPropertyName("pattern")] string Pattern,
[property: JsonPropertyName("apply-to")] string ApplyTo,
[property: JsonPropertyName("definition")] IDictionary<string, object?> Definition,
[property: JsonPropertyName("priority")] int Priority);
18 changes: 18 additions & 0 deletions src/Aspire.Hosting.RabbitMQ/Provisioning/RabbitMQProbeResult.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

namespace Aspire.Hosting.RabbitMQ.Provisioning;

/// <summary>
/// The result of a live broker probe performed by a RabbitMQ resource health check.
/// Kept separate from <c>HealthCheckResult</c> so that resource model classes do not
/// take a dependency on <c>Microsoft.Extensions.Diagnostics.HealthChecks</c>.
/// </summary>
internal readonly record struct RabbitMQProbeResult(bool IsHealthy, string? Description = null)
{
/// <summary>Gets a healthy probe result.</summary>
public static RabbitMQProbeResult Healthy { get; } = new(true);

/// <summary>Returns an unhealthy probe result with the supplied description.</summary>
public static RabbitMQProbeResult Unhealthy(string description) => new(false, description);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using Microsoft.Extensions.Diagnostics.HealthChecks;
using Microsoft.Extensions.Logging;

namespace Aspire.Hosting.RabbitMQ.Provisioning;

/// <summary>
/// Shared <see cref="IHealthCheck"/> implementation for all RabbitMQ child resources (virtual hosts, queues, exchanges, shovels, policies).
/// </summary>
/// <remarks>
/// The check proceeds in four stages: returns <see cref="HealthStatus.Degraded"/> while provisioning is in progress,
/// then awaits <see cref="IRabbitMQProvisionable.ProvisionedTask"/>, then awaits each
/// <see cref="IRabbitMQProvisionable.HealthDependencies"/> task, and finally calls
/// <see cref="IRabbitMQProvisionable.ProbeAsync"/> for a live broker verification.
/// </remarks>
internal sealed class RabbitMQProvisionableHealthCheck(IRabbitMQProvisionable self, IRabbitMQProvisioningClient client, ILogger<RabbitMQProvisionableHealthCheck> logger) : IHealthCheck
{
public async Task<HealthCheckResult> CheckHealthAsync(HealthCheckContext context, CancellationToken cancellationToken = default)
{
// Stage 1: return Degraded immediately if provisioning hasn't completed yet
if (!self.ProvisionedTask.IsCompleted)
{
return HealthCheckResult.Degraded($"Provisioning of '{self.Name}' is in progress.");
}

// Stage 2: own provisioning
try
{
await self.ProvisionedTask.WaitAsync(cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
var message = $"Provisioning of '{self.Name}' failed: {ex.Message}";
logger.LogWarning(ex, "{Message}", message);
return HealthCheckResult.Unhealthy(message, ex);
}

// Stage 3: health dependencies (e.g. policies that apply to this queue/exchange)
foreach (var dep in self.HealthDependencies)
{
try
{
await dep.ProvisionedTask.WaitAsync(cancellationToken).ConfigureAwait(false);
}
catch (OperationCanceledException)
{
throw;
}
catch (Exception ex)
{
var message = $"Dependent resource '{dep.Name}' failed to provision: {ex.Message}";
logger.LogWarning(ex, "{Message}", message);
return HealthCheckResult.Unhealthy(message, ex);
}
}

// Stage 4: live broker probe
var probe = await self.ProbeAsync(client, cancellationToken).ConfigureAwait(false);
if (!probe.IsHealthy)
{
logger.LogWarning("Health probe for '{Resource}' failed: {Reason}", self.Name, probe.Description);
return HealthCheckResult.Unhealthy(probe.Description);
}

return HealthCheckResult.Healthy();
}
}
Loading
Loading