Skip to content

Add stubs for channel pool health checking #2634

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 32 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
1cd5e4a
add stubs and make changes to BigtableChannelPool to allow for integr…
nicholsl Jul 16, 2025
08eb7af
add license headers
nicholsl Jul 16, 2025
cc76959
chore: fix viewConfig in AutomatedBackupPolicy (#2604)
mutianf Jun 16, 2025
b97ff6e
chore: integrate channel pool fixes (#2599)
nicholsl Jun 16, 2025
caa1f89
Update BigtableMaterializedViewIT.java (#2606)
ron-gal Jun 17, 2025
9210c56
test(bigtable): fix mv tests (#2608)
ron-gal Jun 18, 2025
8f15829
tests(bigtable): fix mv tests zone (#2609)
ron-gal Jun 20, 2025
8865a00
ci: allow nonexistent profile in google-cloud-bigtable (#2611)
suztomo Jun 23, 2025
22e6300
ci: add name elements for the POM.xml files (#2612)
suztomo Jun 23, 2025
8c1ac4c
fix: populate table id for materialized view (#2610)
mutianf Jun 24, 2025
1fd64f2
deps: update shared dependencies (#2605)
renovate-bot Jun 27, 2025
f522c8a
chore(main): release 2.61.0 (#2600)
release-please[bot] Jun 27, 2025
21b9278
chore(main): release 2.61.1-SNAPSHOT (#2615)
release-please[bot] Jun 30, 2025
2d08bd5
chore: Update generation configuration at Wed Jul 2 02:33:52 UTC 202…
cloud-java-bot Jul 2, 2025
5a0033c
feat(bigtable): Add schema bundle support (#2619)
ron-gal Jul 9, 2025
039c16a
feat: next release from main branch is 2.62.0 (#2621)
jinseopkim0 Jul 10, 2025
e76b8ca
deps: minor cleanup (#2623)
igorbernstein2 Jul 11, 2025
97a4930
chore: fix old build todo (#2625)
igorbernstein2 Jul 14, 2025
bc08faf
deps: update shared dependencies (#2616)
renovate-bot Jul 15, 2025
11938df
chore(main): release 2.62.0 (#2617)
release-please[bot] Jul 16, 2025
0372cd3
test: deflake prepare plan refresh timeout test (#2628)
jackdingilian Jul 16, 2025
f3c6734
lint
nicholsl Jul 16, 2025
48c7498
Merge branch 'main' into feature/channel-pool-health-checking-stub
nicholsl Jul 16, 2025
8d39504
documentation
nicholsl Jul 16, 2025
86958ad
lint
nicholsl Jul 16, 2025
2cc217c
lint
nicholsl Jul 17, 2025
4229883
lint
nicholsl Jul 17, 2025
c70f7ff
rearchitect so that channelpoolhealthchecker doesn't keep a separate …
nicholsl Jul 21, 2025
94895d5
have all operations related to proberesults be handled by the Channel…
nicholsl Jul 21, 2025
30af3b6
lint
nicholsl Jul 21, 2025
c6f8aef
Merge branch 'googleapis:main' into feature/channel-pool-health-check…
nicholsl Jul 21, 2025
243a5dd
Merge branch 'main' into feature/channel-pool-health-checking-stub
nicholsl Jul 22, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,13 @@ public class BigtableChannelPool extends ManagedChannel {
private final BigtableChannelPoolSettings settings;
private final ChannelFactory channelFactory;
private final ScheduledExecutorService executor;
private final ScheduledExecutorService channelHealthProbingExecutor =
Executors.newSingleThreadScheduledExecutor();

private final Object entryWriteLock = new Object();
@VisibleForTesting final AtomicReference<ImmutableList<Entry>> entries = new AtomicReference<>();
private final ChannelPoolHealthChecker channelPoolHealthChecker =
new ChannelPoolHealthChecker(() -> entries.get());
private final AtomicInteger indexTicker = new AtomicInteger();
private final String authority;

Expand Down Expand Up @@ -92,7 +96,8 @@ public static BigtableChannelPool create(
ImmutableList.Builder<Entry> initialListBuilder = ImmutableList.builder();

for (int i = 0; i < settings.getInitialChannelCount(); i++) {
initialListBuilder.add(new Entry(channelFactory.createSingleChannel()));
initialListBuilder.add(
new Entry(channelFactory.createSingleChannel(), channelHealthProbingExecutor));
}

entries.set(initialListBuilder.build());
Expand Down Expand Up @@ -150,6 +155,9 @@ public ManagedChannel shutdown() {
// shutdownNow will cancel scheduled tasks
executor.shutdownNow();
}
if (channelHealthProbingExecutor != null) {
channelHealthProbingExecutor.shutdownNow();
}
return this;
}

Expand Down Expand Up @@ -190,6 +198,9 @@ public ManagedChannel shutdownNow() {
if (executor != null) {
executor.shutdownNow();
}
if (channelHealthProbingExecutor != null) {
channelHealthProbingExecutor.shutdownNow();
}
return this;
}

Expand Down Expand Up @@ -316,7 +327,8 @@ private void expand(int desiredSize) {

for (int i = 0; i < desiredSize - localEntries.size(); i++) {
try {
newEntries.add(new Entry(channelFactory.createSingleChannel()));
newEntries.add(
new Entry(channelFactory.createSingleChannel(), channelHealthProbingExecutor));
} catch (IOException e) {
LOG.log(Level.WARNING, "Failed to add channel", e);
}
Expand Down Expand Up @@ -354,7 +366,8 @@ void refresh() {

for (int i = 0; i < newEntries.size(); i++) {
try {
newEntries.set(i, new Entry(channelFactory.createSingleChannel()));
newEntries.set(
i, new Entry(channelFactory.createSingleChannel(), channelHealthProbingExecutor));
} catch (IOException e) {
LOG.log(Level.WARNING, "Failed to refresh channel, leaving old channel", e);
}
Expand Down Expand Up @@ -430,14 +443,26 @@ static class Entry {
@VisibleForTesting final AtomicInteger outstandingRpcs = new AtomicInteger(0);

private final AtomicInteger maxOutstanding = new AtomicInteger();
private final AtomicInteger probesInFlight = new AtomicInteger(0);

// Flag that the channel should be closed once all of the outstanding RPC complete.
private final AtomicBoolean shutdownRequested = new AtomicBoolean();
// Flag that the channel has been closed.
private final AtomicBoolean shutdownInitiated = new AtomicBoolean();
private final ChannelHealthChecker healthChecker;

private Entry(ManagedChannel channel) {
private Entry(ManagedChannel channel, ScheduledExecutorService executor) {
this.channel = channel;
this.healthChecker = new ChannelHealthChecker(this, executor);
}

ManagedChannel getManagedChannel() {
return this.channel;
}

// Add a getter for the healthChecker
public ChannelHealthChecker getHealthChecker() {
return healthChecker;
}

int getAndResetMaxOutstanding() {
Expand All @@ -454,7 +479,7 @@ private boolean retain() {
// register desire to start RPC
int currentOutstanding = outstandingRpcs.incrementAndGet();

// Rough book keeping
// Rough bookkeeping
int prevMax = maxOutstanding.get();
if (currentOutstanding > prevMax) {
maxOutstanding.incrementAndGet();
Expand Down Expand Up @@ -491,6 +516,9 @@ private void release() {
*/
private void requestShutdown() {
shutdownRequested.set(true);
if (healthChecker != null) {
healthChecker.stop();
}
if (outstandingRpcs.get() == 0) {
shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.gaxx.grpc;

import com.google.cloud.bigtable.gaxx.grpc.BigtableChannelPool.Entry;
import com.google.common.collect.EvictingQueue;
import java.time.Instant;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
* Stub for a class that checks the health of individual channels in a channel pool by sending
* PingAndWarm probes at a fixed interval
*/
public class ChannelHealthChecker {

// Configuration constants
private static final int WINDOW_DURATION_MINUTES = 5;
private static final int PROBE_RATE_SECONDS = 30;
private static final int PROBE_DEADLINE_MILLISECONDS = 500;
private static final int MIN_PROBES_FOR_EVALUATION = 4;
private static final int FAILURE_PERCENT_THRESHOLD = 60;

// Class fields
final Entry entry;
private final ScheduledExecutorService probeExecutor;
private volatile ScheduledFuture<?> scheduledProbeFuture;
private final ReadWriteLock probeResultsLock = new ReentrantReadWriteLock();
private final EvictingQueue<ProbeResult> probeResults;

// private final AtomicInteger probesInFlight = new AtomicInteger(0);

/** Inner class to represent the result of a single probe. */
class ProbeResult {
final Instant startTime;
final boolean success;

ProbeResult(Instant startTime, boolean success) {
this.startTime = startTime;
this.success = success;
}

public boolean isSuccessful() {
return success;
}
}

/** Constructor for the health checker. */
public ChannelHealthChecker(Entry entry, ScheduledExecutorService executor) {
int queueCapacity = (WINDOW_DURATION_MINUTES * 60) / PROBE_RATE_SECONDS;
this.probeResults = EvictingQueue.create(queueCapacity);
this.entry = entry;
this.probeExecutor = executor;
// Scheduling runProbe will go here
}

/** Cancel health checking future (No-op stub) */
public void stop() {
// Method stub, no operation.
}

/** Runs a single health probe. (No-op stub) */
private void runProbe() {
// Method stub, no operation.
}

/** Callback for when a probe finishes. (No-op stub) */
void probeFinished(Instant startTime, boolean success) {
// Method stub, no operation.
}

/** Number of probes in flight plus number of probe results. (No-op stub) */
AtomicInteger recentProbesSent() {
return new AtomicInteger(0);
}

/** Number of recently failed probes. (No-op stub) */
AtomicInteger recentlyFailedProbes() {
return new AtomicInteger(0);
}

/**
* Determines if the channel is healthy. (No-op stub)
*
* @return A default value of true.
*/
public boolean healthy() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2025 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigtable.gaxx.grpc;

import com.google.cloud.bigtable.gaxx.grpc.BigtableChannelPool.Entry;
import com.google.common.collect.ImmutableList;
import java.time.Instant;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
import javax.annotation.Nullable;

/** Stub for a class that will manage the health checking in the BigtableChannelPool */
public class ChannelPoolHealthChecker {

// Class fields
private final Supplier<ImmutableList<Entry>> entrySupplier;
private Instant lastEviction;
private ScheduledExecutorService executor;

/** Constructor for the pool health checker. */
public ChannelPoolHealthChecker(Supplier<ImmutableList<Entry>> entrySupplier) {
this.entrySupplier = entrySupplier;
this.lastEviction = Instant.MIN;
this.executor = Executors.newSingleThreadScheduledExecutor();
// Scheduling for detectAndRemoveOutlierChannels goes here
}

/**
* Finds a channel that is an outlier in terms of health. (No-op stub)
*
* @return A default value of null.
*/
@Nullable
private Entry findOutlierEntry() {
return null;
}

/** Periodically detects and removes outlier channels from the pool. (No-op stub) */
private void detectAndRemoveOutlierEntries() {
// Method stub, no operation.
}
}
Loading