From 8554f15cf0201e554ec99f17605f590d9360e55c Mon Sep 17 00:00:00 2001 From: yadavpa1 Date: Sun, 8 Dec 2024 00:39:24 -0600 Subject: [PATCH] Removed code from server to shutdown brokers --- .../messagequeue/MessageQueueServer.java | 58 ------------------- src/main/proto/message_queue.proto | 22 ------- 2 files changed, 80 deletions(-) diff --git a/src/main/java/com/clustercrew/messagequeue/MessageQueueServer.java b/src/main/java/com/clustercrew/messagequeue/MessageQueueServer.java index 965f62a..69c10a7 100644 --- a/src/main/java/com/clustercrew/messagequeue/MessageQueueServer.java +++ b/src/main/java/com/clustercrew/messagequeue/MessageQueueServer.java @@ -214,64 +214,6 @@ public void getMetadata(MetadataRequest request, StreamObserver responseObserver) { - try { - String brokerId = request.getBrokerId(); - String brokerAddress = zkClient.getBrokerAddress(brokerId); - - BrokerAddressResponse response = BrokerAddressResponse.newBuilder() - .setSuccess(true) - .setBrokerAddress(brokerAddress) - .build(); - responseObserver.onNext(response); - } catch (Exception e) { - BrokerAddressResponse response = BrokerAddressResponse.newBuilder() - .setSuccess(false) - .setErrorMessage(e.getMessage()) - .build(); - responseObserver.onNext(response); - } finally { - responseObserver.onCompleted(); - } - } - - @Override - public void shutdown(ShutdownRequest request, StreamObserver responseObserver) { - try { - // Check if broker id is same as the one in the request - if (!brokerId.equals(request.getBrokerId())) { - // Use zookeeper to get the broker address for the broker id - String brokerAddress = zkClient.getBrokerAddress(request.getBrokerId()); - // Return error with the broker address - ShutdownResponse response = ShutdownResponse.newBuilder() - .setSuccess(false) - .setErrorMessage("Broker ID does not match") - .setBrokerAddress(brokerAddress) - .build(); - responseObserver.onNext(response); - responseObserver.onCompleted(); - return; - } else { - // Stop the server - stopServer(); - // Return success - ShutdownResponse response = ShutdownResponse.newBuilder() - .setSuccess(true) - .build(); - responseObserver.onNext(response); - responseObserver.onCompleted(); - } - } catch (Exception e) { - ShutdownResponse response = ShutdownResponse.newBuilder() - .setSuccess(false) - .setErrorMessage(e.getMessage()) - .build(); - responseObserver.onNext(response); - responseObserver.onCompleted(); - } - } - private Partition getOrCreatePartition(String topic, int partition) throws Exception { synchronized (topicPartitions) { topicPartitions.computeIfAbsent(topic, k -> new HashMap<>()); diff --git a/src/main/proto/message_queue.proto b/src/main/proto/message_queue.proto index 9b629c3..b007c02 100644 --- a/src/main/proto/message_queue.proto +++ b/src/main/proto/message_queue.proto @@ -8,8 +8,6 @@ service MessageQueue { rpc ProduceMessages(ProduceMessagesRequest) returns (ProduceMessagesResponse); rpc ConsumeMessages(ConsumeMessagesRequest) returns (ConsumeMessagesResponse); rpc GetMetadata(MetadataRequest) returns (MetadataResponse); - rpc GetBrokerAddress(BrokerAddressRequest) returns (BrokerAddressResponse); - rpc Shutdown(ShutdownRequest) returns (ShutdownResponse); } message Message { @@ -59,24 +57,4 @@ message MetadataResponse { message PartitionMetadata { int32 partition_id = 1; string broker_address = 2; -} - -message BrokerAddressRequest { - string broker_id = 1; -} - -message BrokerAddressResponse { - string broker_address = 1; - bool success = 2; - string error_message = 3; -} - -message ShutdownRequest { - string broker_id = 1; -} - -message ShutdownResponse { - bool success = 1; - string error_message = 2; - string broker_address = 3; } \ No newline at end of file