Skip to content

Commit ea7e05e

Browse files
authored
Merge pull request #923 from scalecube/enhance-bidirectional-communication
Added file-upload functionality
1 parent 00cab25 commit ea7e05e

File tree

29 files changed

+1032
-318
lines changed

29 files changed

+1032
-318
lines changed

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@
5858

5959
<properties>
6060
<scalecube-cluster.version>2.7.2</scalecube-cluster.version>
61-
<scalecube-security.version>1.1.4</scalecube-security.version>
61+
<scalecube-security.version>1.1.6</scalecube-security.version>
6262

6363
<reactor.version>2024.0.8</reactor.version>
6464
<netty.version>4.2.4.Final</netty.version>

services-api/src/main/java/io/scalecube/services/RequestContext.java

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import static io.scalecube.services.api.ServiceMessage.HEADER_QUALIFIER;
44
import static io.scalecube.services.api.ServiceMessage.HEADER_REQUEST_METHOD;
5+
import static io.scalecube.services.api.ServiceMessage.HEADER_UPLOAD_FILENAME;
56
import static io.scalecube.services.auth.Principal.NULL_PRINCIPAL;
67

78
import io.scalecube.services.auth.Principal;
@@ -12,7 +13,6 @@
1213
import java.util.Map;
1314
import java.util.Map.Entry;
1415
import java.util.Objects;
15-
import java.util.StringJoiner;
1616
import java.util.stream.Stream;
1717
import org.slf4j.Logger;
1818
import org.slf4j.LoggerFactory;
@@ -159,6 +159,15 @@ public String requestQualifier() {
159159
return header(HEADER_QUALIFIER);
160160
}
161161

162+
/**
163+
* Returns upload filename from headers.
164+
*
165+
* @return upload filename, or {@code null} if not found
166+
*/
167+
public String uploadFilename() {
168+
return header(HEADER_UPLOAD_FILENAME);
169+
}
170+
162171
/**
163172
* Returns principal object (authenticated entity) associated with the request.
164173
*
@@ -348,11 +357,4 @@ public static Mono<RequestContext> deferSecured() {
348357
}
349358
});
350359
}
351-
352-
@Override
353-
public String toString() {
354-
return new StringJoiner(", ", RequestContext.class.getSimpleName() + "[", "]")
355-
.add("source=" + source)
356-
.toString();
357-
}
358360
}

services-api/src/main/java/io/scalecube/services/ServiceCall.java

Lines changed: 78 additions & 134 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,6 @@
3131
import java.util.Optional;
3232
import java.util.function.Function;
3333
import org.reactivestreams.Publisher;
34-
import org.slf4j.Logger;
35-
import org.slf4j.LoggerFactory;
3634
import reactor.core.Exceptions;
3735
import reactor.core.publisher.Flux;
3836
import reactor.core.publisher.Mono;
@@ -46,7 +44,8 @@ public class ServiceCall implements AutoCloseable {
4644
private ServiceClientErrorMapper errorMapper = DefaultErrorMapper.INSTANCE;
4745
private Map<String, String> credentials = Collections.emptyMap();
4846
private String contentType = ServiceMessage.DEFAULT_DATA_FORMAT;
49-
private Logger logger;
47+
48+
// private Logger logger;
5049

5150
public ServiceCall() {}
5251

@@ -57,7 +56,6 @@ private ServiceCall(ServiceCall other) {
5756
this.errorMapper = other.errorMapper;
5857
this.contentType = other.contentType;
5958
this.credentials = Collections.unmodifiableMap(new HashMap<>(other.credentials));
60-
this.logger = other.logger;
6159
}
6260

6361
/**
@@ -144,42 +142,6 @@ public ServiceCall contentType(String contentType) {
144142
return target;
145143
}
146144

147-
/**
148-
* Setter for {@link ServiceCall} {@code logger}.
149-
*
150-
* @param name logger name (optional)
151-
* @return new {@link ServiceCall} instance.
152-
*/
153-
public ServiceCall logger(String name) {
154-
ServiceCall target = new ServiceCall(this);
155-
target.logger = name != null ? LoggerFactory.getLogger(name) : null;
156-
return target;
157-
}
158-
159-
/**
160-
* Setter for {@link ServiceCall} {@code logger}.
161-
*
162-
* @param clazz logger name (optional)
163-
* @return new {@link ServiceCall} instance.
164-
*/
165-
public ServiceCall logger(Class<?> clazz) {
166-
ServiceCall target = new ServiceCall(this);
167-
target.logger = clazz != null ? LoggerFactory.getLogger(clazz) : null;
168-
return target;
169-
}
170-
171-
/**
172-
* Setter for {@link ServiceCall} {@code logger}.
173-
*
174-
* @param logger logger (optional)
175-
* @return new {@link ServiceCall} instance.
176-
*/
177-
public ServiceCall logger(Logger logger) {
178-
ServiceCall target = new ServiceCall(this);
179-
target.logger = logger;
180-
return target;
181-
}
182-
183145
/**
184146
* Invokes fire-and-forget request.
185147
*
@@ -209,50 +171,37 @@ public Mono<ServiceMessage> requestOne(ServiceMessage request) {
209171
*/
210172
public Mono<ServiceMessage> requestOne(ServiceMessage request, Type responseType) {
211173
return Mono.defer(
212-
() -> {
213-
ServiceMethodInvoker methodInvoker;
214-
if (serviceRegistry != null
215-
&& (methodInvoker = serviceRegistry.lookupInvoker(request)) != null) {
216-
// local service
217-
return methodInvoker
218-
.invokeOne(request)
219-
.map(this::throwIfError)
220-
.contextWrite(
221-
context -> {
222-
if (context.hasKey(RequestContext.class)) {
223-
return context;
224-
} else {
225-
return new RequestContext(context)
226-
.headers(request.headers())
227-
.request(request)
228-
.principal(NULL_PRINCIPAL);
229-
}
230-
});
231-
} else {
232-
// remote service
233-
Objects.requireNonNull(transport, "[requestOne] transport");
234-
return Mono.fromCallable(() -> serviceLookup(request))
235-
.flatMap(
236-
serviceReference ->
237-
transport
238-
.create(serviceReference)
239-
.requestResponse(request, responseType)
240-
.map(this::throwIfError));
241-
}
242-
})
243-
.doOnSuccess(
244-
response -> {
245-
if (logger != null && logger.isDebugEnabled()) {
246-
logger.debug(
247-
"[{}] request: {}, response: {}", request.qualifier(), request, response);
248-
}
249-
})
250-
.doOnError(
251-
ex -> {
252-
if (logger != null) {
253-
logger.error("[{}][error] request: {}", request.qualifier(), request, ex);
254-
}
255-
});
174+
() -> {
175+
ServiceMethodInvoker methodInvoker;
176+
if (serviceRegistry != null
177+
&& (methodInvoker = serviceRegistry.lookupInvoker(request)) != null) {
178+
// local service
179+
return methodInvoker
180+
.invokeOne(request)
181+
.map(this::throwIfError)
182+
.contextWrite(
183+
context -> {
184+
if (context.hasKey(RequestContext.class)) {
185+
return context;
186+
} else {
187+
return new RequestContext(context)
188+
.headers(request.headers())
189+
.request(request)
190+
.principal(NULL_PRINCIPAL);
191+
}
192+
});
193+
} else {
194+
// remote service
195+
Objects.requireNonNull(transport, "[requestOne] transport");
196+
return Mono.fromCallable(() -> serviceLookup(request))
197+
.flatMap(
198+
serviceReference ->
199+
transport
200+
.create(serviceReference)
201+
.requestResponse(request, responseType)
202+
.map(this::throwIfError));
203+
}
204+
});
256205
}
257206

258207
/**
@@ -274,55 +223,37 @@ public Flux<ServiceMessage> requestMany(ServiceMessage request) {
274223
*/
275224
public Flux<ServiceMessage> requestMany(ServiceMessage request, Type responseType) {
276225
return Flux.defer(
277-
() -> {
278-
ServiceMethodInvoker methodInvoker;
279-
if (serviceRegistry != null
280-
&& (methodInvoker = serviceRegistry.lookupInvoker(request)) != null) {
281-
// local service
282-
return methodInvoker
283-
.invokeMany(request)
284-
.map(this::throwIfError)
285-
.contextWrite(
286-
context -> {
287-
if (context.hasKey(RequestContext.class)) {
288-
return context;
289-
} else {
290-
return new RequestContext(context)
291-
.headers(request.headers())
292-
.request(request)
293-
.principal(NULL_PRINCIPAL);
294-
}
295-
});
296-
} else {
297-
// remote service
298-
Objects.requireNonNull(transport, "[requestMany] transport");
299-
return Mono.fromCallable(() -> serviceLookup(request))
300-
.flatMapMany(
301-
serviceReference ->
302-
transport
303-
.create(serviceReference)
304-
.requestStream(request, responseType)
305-
.map(this::throwIfError));
306-
}
307-
})
308-
.doOnSubscribe(
309-
s -> {
310-
if (logger != null && logger.isDebugEnabled()) {
311-
logger.debug("[{}][subscribe] request: {}", request.qualifier(), request);
312-
}
313-
})
314-
.doOnComplete(
315-
() -> {
316-
if (logger != null && logger.isDebugEnabled()) {
317-
logger.debug("[{}][complete] request: {}", request.qualifier(), request);
318-
}
319-
})
320-
.doOnError(
321-
ex -> {
322-
if (logger != null) {
323-
logger.error("[{}][error] request: {}", request.qualifier(), request, ex);
324-
}
325-
});
226+
() -> {
227+
ServiceMethodInvoker methodInvoker;
228+
if (serviceRegistry != null
229+
&& (methodInvoker = serviceRegistry.lookupInvoker(request)) != null) {
230+
// local service
231+
return methodInvoker
232+
.invokeMany(request)
233+
.map(this::throwIfError)
234+
.contextWrite(
235+
context -> {
236+
if (context.hasKey(RequestContext.class)) {
237+
return context;
238+
} else {
239+
return new RequestContext(context)
240+
.headers(request.headers())
241+
.request(request)
242+
.principal(NULL_PRINCIPAL);
243+
}
244+
});
245+
} else {
246+
// remote service
247+
Objects.requireNonNull(transport, "[requestMany] transport");
248+
return Mono.fromCallable(() -> serviceLookup(request))
249+
.flatMapMany(
250+
serviceReference ->
251+
transport
252+
.create(serviceReference)
253+
.requestStream(request, responseType)
254+
.map(this::throwIfError));
255+
}
256+
});
326257
}
327258

328259
/**
@@ -353,7 +284,20 @@ public Flux<ServiceMessage> requestBidirectional(
353284
if (serviceRegistry != null
354285
&& (methodInvoker = serviceRegistry.lookupInvoker(request)) != null) {
355286
// local service
356-
return methodInvoker.invokeBidirectional(messages).map(this::throwIfError);
287+
return methodInvoker
288+
.invokeBidirectional(messages)
289+
.map(this::throwIfError)
290+
.contextWrite(
291+
context -> {
292+
if (context.hasKey(RequestContext.class)) {
293+
return context;
294+
} else {
295+
return new RequestContext(context)
296+
.headers(request.headers())
297+
.request(request)
298+
.principal(NULL_PRINCIPAL);
299+
}
300+
});
357301
} else {
358302
// remote service
359303
Objects.requireNonNull(transport, "[requestBidirectional] transport");

services-api/src/main/java/io/scalecube/services/api/ServiceMessage.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@ public final class ServiceMessage {
3030
/** Request method header. */
3131
public static final String HEADER_REQUEST_METHOD = "requestMethod";
3232

33+
/** Upload filename header. */
34+
public static final String HEADER_UPLOAD_FILENAME = "uploadFilename";
35+
3336
/** Null value for error type. */
3437
public static final int NULL_ERROR_TYPE = -1;
3538

@@ -199,6 +202,15 @@ public String requestMethod() {
199202
return headers.get(HEADER_REQUEST_METHOD);
200203
}
201204

205+
/**
206+
* Returns upload filename header.
207+
*
208+
* @return upload filename, or null if such header doesn't exist.
209+
*/
210+
public String uploadFilename() {
211+
return headers.get(HEADER_UPLOAD_FILENAME);
212+
}
213+
202214
@Override
203215
public String toString() {
204216
return new StringJoiner(", ", "ServiceMessage" + "[", "]")
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package io.scalecube.services.files;
2+
3+
import java.io.IOException;
4+
import java.nio.ByteBuffer;
5+
import java.nio.channels.FileChannel;
6+
import java.nio.file.Path;
7+
import reactor.core.publisher.Flux;
8+
9+
public class FileChannelFlux {
10+
11+
private static final int DEFAULT_CHUNK_SIZE = 64 * 1024;
12+
13+
private FileChannelFlux() {
14+
// Do not instantiate
15+
}
16+
17+
public static Flux<byte[]> createFrom(Path filePath) {
18+
return createFrom(filePath, DEFAULT_CHUNK_SIZE);
19+
}
20+
21+
public static Flux<byte[]> createFrom(Path filePath, int chunkSize) {
22+
return Flux.using(
23+
() -> FileChannel.open(filePath),
24+
channel -> {
25+
final var chunkBuffer = ByteBuffer.allocate(chunkSize);
26+
return Flux.generate(
27+
() -> channel,
28+
(fc, sink) -> {
29+
try {
30+
chunkBuffer.clear();
31+
int read = fc.read(chunkBuffer);
32+
if (read == -1) {
33+
sink.complete();
34+
return fc;
35+
}
36+
if (read > 0) {
37+
chunkBuffer.flip();
38+
byte[] bytes = new byte[chunkBuffer.remaining()];
39+
chunkBuffer.get(bytes);
40+
sink.next(bytes);
41+
}
42+
return fc;
43+
} catch (IOException e) {
44+
sink.error(e);
45+
return fc;
46+
}
47+
});
48+
},
49+
channel -> {
50+
try {
51+
channel.close();
52+
} catch (IOException ex) {
53+
// no-op
54+
}
55+
});
56+
}
57+
}

0 commit comments

Comments
 (0)