diff --git a/CMakeLists.txt b/CMakeLists.txt index f50a401..5c46f05 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -38,6 +38,7 @@ include_directories("${CMAKE_CURRENT_SOURCE_DIR}/common") include_directories("${CMAKE_CURRENT_BINARY_DIR}/producer") include_directories("${CMAKE_CURRENT_BINARY_DIR}/consumer") include_directories("${CMAKE_CURRENT_BINARY_DIR}/sys_admin") +include_directories("${CMAKE_CURRENT_BINARY_DIR}/test") # dmq_grpc_proto library add_library(dmq_grpc_proto @@ -149,5 +150,15 @@ target_link_libraries(sys_admin_client sys_admin ) +# Test executable +add_executable( performace + test/performace.cc +) +target_link_libraries (performace + producer + consumer_group + -static-libstdc++ -static-libgcc + +) # Set compiler flags for position-independent code for building shared libraries set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC") diff --git a/test/consumer.h b/test/consumer.h new file mode 100644 index 0000000..cb75c31 --- /dev/null +++ b/test/consumer.h @@ -0,0 +1,28 @@ +#ifndef CONSUMER_H +#define CONSUMER_H + +#include +#include +#include + +struct MessageResponse { + std::string key; + std::string value; + std::string topic; + int timestamp; +}; + +class Consumer { +private: + class Impl; + std::unique_ptr impl_; + std::string consumer_id; + +public: + Consumer(const std::vector &bootstrap_servers, std::string consumer_id); + ~Consumer(); + std::vector ConsumeMessage(std::string group_id, std::string topic, int partition, int offset, int max_messages); + std::string get_consumer_id(); +}; + +#endif // CONSUMER_H \ No newline at end of file diff --git a/test/consumer_group.h b/test/consumer_group.h new file mode 100644 index 0000000..3d55bb6 --- /dev/null +++ b/test/consumer_group.h @@ -0,0 +1,36 @@ +#ifndef CONSUMER_GROUP_H +#define CONSUMER_GROUP_H + +#include +#include +#include +#include +#include "consumer.h" + +struct topic_state { + std::string topic; + int partition; + int offset; +}; + +class ConsumerGroup { +private: + std::string tag; + std::string group_id; + std::vector> consumers_; + std::unordered_map>> consumer_topic_state_; + std::unordered_map topic_partition_consumer_; + + bool IsTopicConsumed(std::string topic, int partition); + +public: + ConsumerGroup(std::string tag, std::string group_id); + ~ConsumerGroup(); + bool AddConsumer(const std::vector& bootstrap_servers, std::string consumer_id, std::vector topics, std::vector partitions, std::vector offsets); + bool RemoveConsumer(std::string consumer_id); + std::vector ConsumeMessage(std::string topic, int partition, int max_messages); + void PrintConsumerGroup(); +}; + +#endif // CONSUMER_GROUP_H \ No newline at end of file diff --git a/test/performace.cc b/test/performace.cc new file mode 100644 index 0000000..4235f02 --- /dev/null +++ b/test/performace.cc @@ -0,0 +1,158 @@ +#include +#include +#include +#include +#include +#include +#include +#include "producer.h" +#include "consumer_group.h" +#include "chrono" + +using namespace std; + +// Utility function to generate random messages +string GenerateRandomString(size_t length) { + static const string charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890"; + random_device rd; + mt19937 gen(rd()); + uniform_int_distribution<> dis(0, charset.size() - 1); + string result; + for (size_t i = 0; i < length; ++i) { + result += charset[dis(gen)]; + } + return result; +} + +// Test case 1: Single Producer, Single Consumer Group +void TestCase1() { + cout << "Running TestCase1: Single Producer, Single Consumer Group" << endl; + + vector bootstrap_servers = {"localhost:8080"}; + Producer producer(bootstrap_servers, 1000, 500, "producer1"); + ConsumerGroup consumerGroup("tag1", "group1"); + + consumerGroup.AddConsumer(bootstrap_servers, "consumer1", {"test_topic"}, {0}, {0}); + + const int totalMessages = 10000; + auto startTime = chrono::high_resolution_clock::now(); + + // Producer sends messages + for (int i = 0; i < totalMessages; ++i) { + producer.ProduceMessage("key" + to_string(i), GenerateRandomString(20), "test_topic"); + } + + // Consumer fetches messages + vector responses = consumerGroup.ConsumeMessage("test_topic", 0, totalMessages); + + auto endTime = chrono::high_resolution_clock::now(); + chrono::duration elapsed = endTime - startTime; + + cout << "TestCase1 Results: " << endl; + cout << "Total messages: " << totalMessages << endl; + cout << "Elapsed time: " << elapsed.count() << " seconds" << endl; + cout << "Throughput: " << totalMessages / elapsed.count() << " messages/second" << endl; +} + +// Test case 2: Multiple Producers, Single Consumer Group (Using Processes) +void TestCase2WithProcesses() { + cout << "Running TestCase2: Multiple Producers, Single Consumer Group (Using Processes)" << endl; + + vector bootstrap_servers = {"localhost:8080"}; + ConsumerGroup consumerGroup("tag2", "group2"); + consumerGroup.AddConsumer(bootstrap_servers, "consumer1", {"test_topic"}, {0}, {0}); + + const int messagesPerProducer = 5000; + const int numProducers = 3; + + // Fork processes for producers + for (int i = 0; i < numProducers; ++i) { + pid_t pid = fork(); + if (pid == 0) { // Child process + Producer producer(bootstrap_servers, 1000, 500, "producer" + to_string(i)); + for (int j = 0; j < messagesPerProducer; ++j) { + producer.ProduceMessage("key" + to_string(i) + "_" + to_string(j), + GenerateRandomString(20), "test_topic"); + } + exit(0); // Exit the child process + } else if (pid < 0) { + cerr << "Error forking process for producer " << i << endl; + exit(1); + } + } + + // Wait for all child processes to finish + for (int i = 0; i < numProducers; ++i) { + int status; + wait(&status); + } + + // Consumer fetches messages + vector responses = consumerGroup.ConsumeMessage("test_topic", 0, numProducers * messagesPerProducer); + + cout << "TestCase2 Results: " << endl; + cout << "Total messages produced: " << numProducers * messagesPerProducer << endl; + cout << "Messages consumed: " << responses.size() << endl; +} + +// Test case 3: High Volume Stress Test (Using Processes) +void TestCase3WithProcesses() { + cout << "Running TestCase3: High Volume Stress Test (Using Processes)" << endl; + + vector bootstrap_servers = {"localhost:8080"}; + vector producers; + ConsumerGroup consumerGroup("tag3", "group3"); + + // Adding consumers + for (int i = 0; i < 5; ++i) { + consumerGroup.AddConsumer(bootstrap_servers, "consumer" + to_string(i), {"test_topic"}, {i}, {0}); + } + + const int messagesPerProducer = 100000; + const int numProducers = 10; + + // Fork processes for producers + for (int i = 0; i < numProducers; ++i) { + pid_t pid = fork(); + if (pid == 0) { // Child process + Producer producer(bootstrap_servers, 1000, 500, "producer" + to_string(i)); + for (int j = 0; j < messagesPerProducer; ++j) { + producer.ProduceMessage("key" + to_string(i) + "_" + to_string(j), + GenerateRandomString(20), "test_topic"); + } + exit(0); // Exit the child process + } else if (pid < 0) { + cerr << "Error forking process for producer " << i << endl; + exit(1); + } + } + + // Wait for all child processes to finish + for (int i = 0; i < numProducers; ++i) { + int status; + wait(&status); + } + + cout << "All producers completed." << endl; + + // Simulate consumers fetching messages + auto startTime = chrono::high_resolution_clock::now(); + + vector responses = consumerGroup.ConsumeMessage("test_topic", 0, numProducers * messagesPerProducer); + + auto endTime = chrono::high_resolution_clock::now(); + chrono::duration elapsed = endTime - startTime; + + cout << "TestCase3 Results: " << endl; + cout << "Total messages consumed: " << responses.size() << endl; + cout << "Elapsed time: " << elapsed.count() << " seconds" << endl; + cout << "Throughput: " << (numProducers * messagesPerProducer) / elapsed.count() << " messages/second" << endl; +} + +// Main function to execute all test cases +int main() { + TestCase1(); + TestCase2WithProcesses(); + TestCase3WithProcesses(); + return 0; +} diff --git a/test/producer.h b/test/producer.h new file mode 100644 index 0000000..6856077 --- /dev/null +++ b/test/producer.h @@ -0,0 +1,23 @@ +#ifndef MESSAGE_QUEUE_PRODUCER_H +#define MESSAGE_QUEUE_PRODUCER_H + +#include +#include +#include + +class Producer { +public: + // Constructor to initialize producer with bootstrap servers + Producer(const std::vector& bootstrap_servers, int flush_threshold, int flush_interval_ms, const std::string& producer_id); + + ~Producer(); // Declare the destructor + + // Produces a message to the message queue + bool ProduceMessage(const std::string& key, const std::string& value, const std::string& topic); + +private: + class Impl; // Forward declaration of the implementation class + std::unique_ptr impl_; // Pointer to the implementation class +}; + +#endif // MESSAGE_QUEUE_PRODUCER_H