Skip to content

Removed @Experimental annotation #522

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: release_v2.4.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions query/src/main/java/tech/ydb/query/QueryClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

import javax.annotation.WillNotClose;

import io.grpc.ExperimentalApi;

import tech.ydb.core.Result;
import tech.ydb.core.grpc.GrpcTransport;
import tech.ydb.query.impl.QueryClientImpl;
Expand All @@ -18,7 +16,6 @@
*
* @author Aleksandr Gorshenin
*/
@ExperimentalApi("QueryService is experimental and API may change without notice")
public interface QueryClient extends AutoCloseable {
static Builder newClient(@WillNotClose GrpcTransport transport) {
return QueryClientImpl.newClient(transport);
Expand Down
3 changes: 0 additions & 3 deletions query/src/main/java/tech/ydb/query/QuerySession.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import java.util.concurrent.CompletableFuture;

import io.grpc.ExperimentalApi;

import tech.ydb.common.transaction.TxMode;
import tech.ydb.core.Result;
import tech.ydb.query.settings.BeginTransactionSettings;
Expand All @@ -22,7 +20,6 @@
*
* @author Aleksandr Gorshenin
*/
@ExperimentalApi("QueryService is experimental and API may change without notice")
public interface QuerySession extends AutoCloseable {

/**
Expand Down
3 changes: 0 additions & 3 deletions query/src/main/java/tech/ydb/query/QueryStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import java.util.concurrent.CompletableFuture;

import io.grpc.ExperimentalApi;

import tech.ydb.core.Issue;
import tech.ydb.core.Result;
import tech.ydb.query.result.QueryInfo;
Expand All @@ -13,7 +11,6 @@
*
* @author Aleksandr Gorshenin
*/
@ExperimentalApi("QueryService is experimental and API may change without notice")
public interface QueryStream {
interface PartsHandler {
default void onIssues(Issue[] issues) { }
Expand Down
3 changes: 0 additions & 3 deletions query/src/main/java/tech/ydb/query/QueryTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@

import java.util.concurrent.CompletableFuture;

import io.grpc.ExperimentalApi;

import tech.ydb.common.transaction.YdbTransaction;
import tech.ydb.core.Result;
import tech.ydb.core.Status;
Expand All @@ -22,7 +20,6 @@
*
* @author Aleksandr Gorshenin
*/
@ExperimentalApi("QueryService is experimental and API may change without notice")
public interface QueryTransaction extends YdbTransaction {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@
import java.io.InputStream;
import java.net.InetAddress;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicBoolean;

import com.google.common.io.ByteStreams;
import io.grpc.CallOptions;
Expand Down Expand Up @@ -71,21 +70,19 @@ public void close() {
}

private static class CallProxy<ReqT, RespT> {
final RequestProxy serverCallListener;
final ResponseProxy clientCallListener;
final ClientProxy clientProxy;
final ServerProxy serverProxy;

CallProxy(ServerCall<ReqT, RespT> serverCall, ClientCall<ReqT, RespT> clientCall) {
serverCallListener = new RequestProxy(clientCall);
clientCallListener = new ResponseProxy(serverCall);
clientProxy = new ClientProxy(clientCall);
serverProxy = new ServerProxy(serverCall);
}

private class RequestProxy extends ServerCall.Listener<ReqT> {
private final Lock clientCallLock = new ReentrantLock();
private class ClientProxy extends ServerCall.Listener<ReqT> {
private final ClientCall<ReqT, ?> clientCall;
// Hold 'this' lock when accessing
private boolean needToRequest;
private final AtomicBoolean needToRequest = new AtomicBoolean(false);

RequestProxy(ClientCall<ReqT, ?> clientCall) {
ClientProxy(ClientCall<ReqT, ?> clientCall) {
this.clientCall = clientCall;
}

Expand All @@ -102,47 +99,34 @@ public void onHalfClose() {
@Override
public void onMessage(ReqT message) {
clientCall.sendMessage(message);
clientCallLock.lock();
try {
if (clientCall.isReady()) {
clientCallListener.serverCall.request(1);
} else {
// The outgoing call is not ready for more requests. Stop requesting additional data and
// wait for it to catch up.
needToRequest = true;
}
} finally {
clientCallLock.unlock();
if (clientCall.isReady()) {
serverProxy.serverCall.request(1);
} else {
// The outgoing call is not ready for more requests. Stop requesting additional data and
// wait for it to catch up.
needToRequest.set(true);
}
}

@Override
public void onReady() {
clientCallListener.onServerReady();
serverProxy.onServerReady();
}

// Called from ResponseProxy, which is a different thread than the ServerCall.Listener
// callbacks.
void onClientReady() {
clientCallLock.lock();
try {
if (needToRequest) {
clientCallListener.serverCall.request(1);
needToRequest = false;
}
} finally {
clientCallLock.unlock();
if (needToRequest.compareAndSet(true, false)) {
serverProxy.serverCall.request(1);
}
}
}

private class ResponseProxy extends ClientCall.Listener<RespT> {
private final Lock serverCallLock = new ReentrantLock();
private class ServerProxy extends ClientCall.Listener<RespT> {
private final ServerCall<?, RespT> serverCall;
// Hold 'this' lock when accessing
private boolean needToRequest;
private final AtomicBoolean needToRequest = new AtomicBoolean(false);

ResponseProxy(ServerCall<?, RespT> serverCall) {
ServerProxy(ServerCall<?, RespT> serverCall) {
this.serverCall = serverCall;
}

Expand All @@ -159,36 +143,25 @@ public void onHeaders(Metadata headers) {
@Override
public void onMessage(RespT message) {
serverCall.sendMessage(message);
serverCallLock.lock();
try {
if (serverCall.isReady()) {
serverCallListener.clientCall.request(1);
} else {
// The incoming call is not ready for more responses. Stop requesting additional data
// and wait for it to catch up.
needToRequest = true;
}
} finally {
serverCallLock.unlock();
if (serverCall.isReady()) {
clientProxy.clientCall.request(1);
} else {
// The incoming call is not ready for more responses. Stop requesting additional data
// and wait for it to catch up.
needToRequest.set(true);
}
}

@Override
public void onReady() {
serverCallListener.onClientReady();
clientProxy.onClientReady();
}

// Called from RequestProxy, which is a different thread than the ClientCall.Listener
// callbacks.
void onServerReady() {
serverCallLock.lock();
try {
if (needToRequest) {
serverCallListener.clientCall.request(1);
needToRequest = false;
}
} finally {
serverCallLock.unlock();
if (needToRequest.compareAndSet(true, false)) {
clientProxy.clientCall.request(1);
}
}
}
Expand All @@ -199,10 +172,10 @@ private class ProxyHandler<ReqT, RespT> implements ServerCallHandler<ReqT, RespT
public ServerCall.Listener<ReqT> startCall(ServerCall<ReqT, RespT> serverCall, Metadata metadata) {
ClientCall<ReqT, RespT> clientCall = target.newCall(serverCall.getMethodDescriptor(), CallOptions.DEFAULT);
CallProxy<ReqT, RespT> proxy = new CallProxy<>(serverCall, clientCall);
clientCall.start(proxy.clientCallListener, metadata);
serverCall.request(1);
clientCall.start(proxy.serverProxy, metadata);
clientCall.request(1);
return proxy.serverCallListener;
serverCall.request(1);
return proxy.clientProxy;
}
}

Expand Down
Loading