Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 0 additions & 58 deletions src/main/java/com/clustercrew/messagequeue/MessageQueueServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -214,64 +214,6 @@ public void getMetadata(MetadataRequest request, StreamObserver<MetadataResponse
}
}

@Override
public void getBrokerAddress(BrokerAddressRequest request, StreamObserver<BrokerAddressResponse> responseObserver) {
try {
String brokerId = request.getBrokerId();
String brokerAddress = zkClient.getBrokerAddress(brokerId);

BrokerAddressResponse response = BrokerAddressResponse.newBuilder()
.setSuccess(true)
.setBrokerAddress(brokerAddress)
.build();
responseObserver.onNext(response);
} catch (Exception e) {
BrokerAddressResponse response = BrokerAddressResponse.newBuilder()
.setSuccess(false)
.setErrorMessage(e.getMessage())
.build();
responseObserver.onNext(response);
} finally {
responseObserver.onCompleted();
}
}

@Override
public void shutdown(ShutdownRequest request, StreamObserver<ShutdownResponse> responseObserver) {
try {
// Check if broker id is same as the one in the request
if (!brokerId.equals(request.getBrokerId())) {
// Use zookeeper to get the broker address for the broker id
String brokerAddress = zkClient.getBrokerAddress(request.getBrokerId());
// Return error with the broker address
ShutdownResponse response = ShutdownResponse.newBuilder()
.setSuccess(false)
.setErrorMessage("Broker ID does not match")
.setBrokerAddress(brokerAddress)
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
return;
} else {
// Stop the server
stopServer();
// Return success
ShutdownResponse response = ShutdownResponse.newBuilder()
.setSuccess(true)
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
} catch (Exception e) {
ShutdownResponse response = ShutdownResponse.newBuilder()
.setSuccess(false)
.setErrorMessage(e.getMessage())
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
}

private Partition getOrCreatePartition(String topic, int partition) throws Exception {
synchronized (topicPartitions) {
topicPartitions.computeIfAbsent(topic, k -> new HashMap<>());
Expand Down
22 changes: 0 additions & 22 deletions src/main/proto/message_queue.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@ service MessageQueue {
rpc ProduceMessages(ProduceMessagesRequest) returns (ProduceMessagesResponse);
rpc ConsumeMessages(ConsumeMessagesRequest) returns (ConsumeMessagesResponse);
rpc GetMetadata(MetadataRequest) returns (MetadataResponse);
rpc GetBrokerAddress(BrokerAddressRequest) returns (BrokerAddressResponse);
rpc Shutdown(ShutdownRequest) returns (ShutdownResponse);
}

message Message {
Expand Down Expand Up @@ -59,24 +57,4 @@ message MetadataResponse {
message PartitionMetadata {
int32 partition_id = 1;
string broker_address = 2;
}

message BrokerAddressRequest {
string broker_id = 1;
}

message BrokerAddressResponse {
string broker_address = 1;
bool success = 2;
string error_message = 3;
}

message ShutdownRequest {
string broker_id = 1;
}

message ShutdownResponse {
bool success = 1;
string error_message = 2;
string broker_address = 3;
}