diff --git a/query/src/main/java/tech/ydb/query/QueryClient.java b/query/src/main/java/tech/ydb/query/QueryClient.java index b0b49667e..50cdf2bec 100644 --- a/query/src/main/java/tech/ydb/query/QueryClient.java +++ b/query/src/main/java/tech/ydb/query/QueryClient.java @@ -6,8 +6,6 @@ import javax.annotation.WillNotClose; -import io.grpc.ExperimentalApi; - import tech.ydb.core.Result; import tech.ydb.core.grpc.GrpcTransport; import tech.ydb.query.impl.QueryClientImpl; @@ -18,7 +16,6 @@ * * @author Aleksandr Gorshenin */ -@ExperimentalApi("QueryService is experimental and API may change without notice") public interface QueryClient extends AutoCloseable { static Builder newClient(@WillNotClose GrpcTransport transport) { return QueryClientImpl.newClient(transport); diff --git a/query/src/main/java/tech/ydb/query/QuerySession.java b/query/src/main/java/tech/ydb/query/QuerySession.java index 9bbee82a4..f905d3c26 100644 --- a/query/src/main/java/tech/ydb/query/QuerySession.java +++ b/query/src/main/java/tech/ydb/query/QuerySession.java @@ -2,8 +2,6 @@ import java.util.concurrent.CompletableFuture; -import io.grpc.ExperimentalApi; - import tech.ydb.common.transaction.TxMode; import tech.ydb.core.Result; import tech.ydb.query.settings.BeginTransactionSettings; @@ -22,7 +20,6 @@ * * @author Aleksandr Gorshenin */ -@ExperimentalApi("QueryService is experimental and API may change without notice") public interface QuerySession extends AutoCloseable { /** diff --git a/query/src/main/java/tech/ydb/query/QueryStream.java b/query/src/main/java/tech/ydb/query/QueryStream.java index a4ee3ba7f..c4038db4b 100644 --- a/query/src/main/java/tech/ydb/query/QueryStream.java +++ b/query/src/main/java/tech/ydb/query/QueryStream.java @@ -2,8 +2,6 @@ import java.util.concurrent.CompletableFuture; -import io.grpc.ExperimentalApi; - import tech.ydb.core.Issue; import tech.ydb.core.Result; import tech.ydb.query.result.QueryInfo; @@ -13,7 +11,6 @@ * * @author Aleksandr Gorshenin */ -@ExperimentalApi("QueryService is experimental and API may change without notice") public interface QueryStream { interface PartsHandler { default void onIssues(Issue[] issues) { } diff --git a/query/src/main/java/tech/ydb/query/QueryTransaction.java b/query/src/main/java/tech/ydb/query/QueryTransaction.java index 89e943b48..4780c62df 100644 --- a/query/src/main/java/tech/ydb/query/QueryTransaction.java +++ b/query/src/main/java/tech/ydb/query/QueryTransaction.java @@ -2,8 +2,6 @@ import java.util.concurrent.CompletableFuture; -import io.grpc.ExperimentalApi; - import tech.ydb.common.transaction.YdbTransaction; import tech.ydb.core.Result; import tech.ydb.core.Status; @@ -22,7 +20,6 @@ * * @author Aleksandr Gorshenin */ -@ExperimentalApi("QueryService is experimental and API may change without notice") public interface QueryTransaction extends YdbTransaction { /** diff --git a/tests/common/src/main/java/tech/ydb/test/integration/docker/GrpcProxyServer.java b/tests/common/src/main/java/tech/ydb/test/integration/docker/GrpcProxyServer.java index a7974edf6..c16e92b6d 100644 --- a/tests/common/src/main/java/tech/ydb/test/integration/docker/GrpcProxyServer.java +++ b/tests/common/src/main/java/tech/ydb/test/integration/docker/GrpcProxyServer.java @@ -5,8 +5,7 @@ import java.io.InputStream; import java.net.InetAddress; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.io.ByteStreams; import io.grpc.CallOptions; @@ -71,21 +70,19 @@ public void close() { } private static class CallProxy { - final RequestProxy serverCallListener; - final ResponseProxy clientCallListener; + final ClientProxy clientProxy; + final ServerProxy serverProxy; CallProxy(ServerCall serverCall, ClientCall clientCall) { - serverCallListener = new RequestProxy(clientCall); - clientCallListener = new ResponseProxy(serverCall); + clientProxy = new ClientProxy(clientCall); + serverProxy = new ServerProxy(serverCall); } - private class RequestProxy extends ServerCall.Listener { - private final Lock clientCallLock = new ReentrantLock(); + private class ClientProxy extends ServerCall.Listener { private final ClientCall clientCall; - // Hold 'this' lock when accessing - private boolean needToRequest; + private final AtomicBoolean needToRequest = new AtomicBoolean(false); - RequestProxy(ClientCall clientCall) { + ClientProxy(ClientCall clientCall) { this.clientCall = clientCall; } @@ -102,47 +99,34 @@ public void onHalfClose() { @Override public void onMessage(ReqT message) { clientCall.sendMessage(message); - clientCallLock.lock(); - try { - if (clientCall.isReady()) { - clientCallListener.serverCall.request(1); - } else { - // The outgoing call is not ready for more requests. Stop requesting additional data and - // wait for it to catch up. - needToRequest = true; - } - } finally { - clientCallLock.unlock(); + if (clientCall.isReady()) { + serverProxy.serverCall.request(1); + } else { + // The outgoing call is not ready for more requests. Stop requesting additional data and + // wait for it to catch up. + needToRequest.set(true); } } @Override public void onReady() { - clientCallListener.onServerReady(); + serverProxy.onServerReady(); } // Called from ResponseProxy, which is a different thread than the ServerCall.Listener // callbacks. void onClientReady() { - clientCallLock.lock(); - try { - if (needToRequest) { - clientCallListener.serverCall.request(1); - needToRequest = false; - } - } finally { - clientCallLock.unlock(); + if (needToRequest.compareAndSet(true, false)) { + serverProxy.serverCall.request(1); } } } - private class ResponseProxy extends ClientCall.Listener { - private final Lock serverCallLock = new ReentrantLock(); + private class ServerProxy extends ClientCall.Listener { private final ServerCall serverCall; - // Hold 'this' lock when accessing - private boolean needToRequest; + private final AtomicBoolean needToRequest = new AtomicBoolean(false); - ResponseProxy(ServerCall serverCall) { + ServerProxy(ServerCall serverCall) { this.serverCall = serverCall; } @@ -159,36 +143,25 @@ public void onHeaders(Metadata headers) { @Override public void onMessage(RespT message) { serverCall.sendMessage(message); - serverCallLock.lock(); - try { - if (serverCall.isReady()) { - serverCallListener.clientCall.request(1); - } else { - // The incoming call is not ready for more responses. Stop requesting additional data - // and wait for it to catch up. - needToRequest = true; - } - } finally { - serverCallLock.unlock(); + if (serverCall.isReady()) { + clientProxy.clientCall.request(1); + } else { + // The incoming call is not ready for more responses. Stop requesting additional data + // and wait for it to catch up. + needToRequest.set(true); } } @Override public void onReady() { - serverCallListener.onClientReady(); + clientProxy.onClientReady(); } // Called from RequestProxy, which is a different thread than the ClientCall.Listener // callbacks. void onServerReady() { - serverCallLock.lock(); - try { - if (needToRequest) { - serverCallListener.clientCall.request(1); - needToRequest = false; - } - } finally { - serverCallLock.unlock(); + if (needToRequest.compareAndSet(true, false)) { + clientProxy.clientCall.request(1); } } } @@ -199,10 +172,10 @@ private class ProxyHandler implements ServerCallHandler startCall(ServerCall serverCall, Metadata metadata) { ClientCall clientCall = target.newCall(serverCall.getMethodDescriptor(), CallOptions.DEFAULT); CallProxy proxy = new CallProxy<>(serverCall, clientCall); - clientCall.start(proxy.clientCallListener, metadata); - serverCall.request(1); + clientCall.start(proxy.serverProxy, metadata); clientCall.request(1); - return proxy.serverCallListener; + serverCall.request(1); + return proxy.clientProxy; } }