Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ include_directories("${CMAKE_CURRENT_SOURCE_DIR}/common")
include_directories("${CMAKE_CURRENT_BINARY_DIR}/producer")
include_directories("${CMAKE_CURRENT_BINARY_DIR}/consumer")
include_directories("${CMAKE_CURRENT_BINARY_DIR}/sys_admin")
include_directories("${CMAKE_CURRENT_BINARY_DIR}/test")

# dmq_grpc_proto library
add_library(dmq_grpc_proto
Expand Down Expand Up @@ -149,5 +150,15 @@ target_link_libraries(sys_admin_client
sys_admin
)

# Test executable
add_executable( performace
test/performace.cc
)
target_link_libraries (performace
producer
consumer_group
-static-libstdc++ -static-libgcc

)
# Set compiler flags for position-independent code for building shared libraries
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fPIC")
28 changes: 28 additions & 0 deletions test/consumer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#ifndef CONSUMER_H
#define CONSUMER_H

#include <memory>
#include <string>
#include <vector>

struct MessageResponse {
std::string key;
std::string value;
std::string topic;
int timestamp;
};

class Consumer {
private:
class Impl;
std::unique_ptr<Impl> impl_;
std::string consumer_id;

public:
Consumer(const std::vector<std::string> &bootstrap_servers, std::string consumer_id);
~Consumer();
std::vector<MessageResponse> ConsumeMessage(std::string group_id, std::string topic, int partition, int offset, int max_messages);
std::string get_consumer_id();
};

#endif // CONSUMER_H
36 changes: 36 additions & 0 deletions test/consumer_group.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
#ifndef CONSUMER_GROUP_H
#define CONSUMER_GROUP_H

#include <memory>
#include <string>
#include <vector>
#include <unordered_map>
#include "consumer.h"

struct topic_state {
std::string topic;
int partition;
int offset;
};

class ConsumerGroup {
private:
std::string tag;
std::string group_id;
std::vector<std::unique_ptr<Consumer>> consumers_;
std::unordered_map<std::string,
std::unordered_map<std::string, std::vector<topic_state>>> consumer_topic_state_;
std::unordered_map<std::string, std::string> topic_partition_consumer_;

bool IsTopicConsumed(std::string topic, int partition);

public:
ConsumerGroup(std::string tag, std::string group_id);
~ConsumerGroup();
bool AddConsumer(const std::vector<std::string>& bootstrap_servers, std::string consumer_id, std::vector<std::string> topics, std::vector<int> partitions, std::vector<int> offsets);
bool RemoveConsumer(std::string consumer_id);
std::vector<MessageResponse> ConsumeMessage(std::string topic, int partition, int max_messages);
void PrintConsumerGroup();
};

#endif // CONSUMER_GROUP_H
158 changes: 158 additions & 0 deletions test/performace.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
#include <iostream>
#include <vector>
#include <string>
#include <random>
#include <sys/types.h>
#include <sys/wait.h>
#include <unistd.h>
#include "producer.h"
#include "consumer_group.h"
#include "chrono"

using namespace std;

// Utility function to generate random messages
string GenerateRandomString(size_t length) {
static const string charset = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890";
random_device rd;
mt19937 gen(rd());
uniform_int_distribution<> dis(0, charset.size() - 1);
string result;
for (size_t i = 0; i < length; ++i) {
result += charset[dis(gen)];
}
return result;
}

// Test case 1: Single Producer, Single Consumer Group
void TestCase1() {
cout << "Running TestCase1: Single Producer, Single Consumer Group" << endl;

vector<string> bootstrap_servers = {"localhost:8080"};
Producer producer(bootstrap_servers, 1000, 500, "producer1");
ConsumerGroup consumerGroup("tag1", "group1");

consumerGroup.AddConsumer(bootstrap_servers, "consumer1", {"test_topic"}, {0}, {0});

const int totalMessages = 10000;
auto startTime = chrono::high_resolution_clock::now();

// Producer sends messages
for (int i = 0; i < totalMessages; ++i) {
producer.ProduceMessage("key" + to_string(i), GenerateRandomString(20), "test_topic");
}

// Consumer fetches messages
vector<MessageResponse> responses = consumerGroup.ConsumeMessage("test_topic", 0, totalMessages);

auto endTime = chrono::high_resolution_clock::now();
chrono::duration<double> elapsed = endTime - startTime;

cout << "TestCase1 Results: " << endl;
cout << "Total messages: " << totalMessages << endl;
cout << "Elapsed time: " << elapsed.count() << " seconds" << endl;
cout << "Throughput: " << totalMessages / elapsed.count() << " messages/second" << endl;
}

// Test case 2: Multiple Producers, Single Consumer Group (Using Processes)
void TestCase2WithProcesses() {
cout << "Running TestCase2: Multiple Producers, Single Consumer Group (Using Processes)" << endl;

vector<string> bootstrap_servers = {"localhost:8080"};
ConsumerGroup consumerGroup("tag2", "group2");
consumerGroup.AddConsumer(bootstrap_servers, "consumer1", {"test_topic"}, {0}, {0});

const int messagesPerProducer = 5000;
const int numProducers = 3;

// Fork processes for producers
for (int i = 0; i < numProducers; ++i) {
pid_t pid = fork();
if (pid == 0) { // Child process
Producer producer(bootstrap_servers, 1000, 500, "producer" + to_string(i));
for (int j = 0; j < messagesPerProducer; ++j) {
producer.ProduceMessage("key" + to_string(i) + "_" + to_string(j),
GenerateRandomString(20), "test_topic");
}
exit(0); // Exit the child process
} else if (pid < 0) {
cerr << "Error forking process for producer " << i << endl;
exit(1);
}
}

// Wait for all child processes to finish
for (int i = 0; i < numProducers; ++i) {
int status;
wait(&status);
}

// Consumer fetches messages
vector<MessageResponse> responses = consumerGroup.ConsumeMessage("test_topic", 0, numProducers * messagesPerProducer);

cout << "TestCase2 Results: " << endl;
cout << "Total messages produced: " << numProducers * messagesPerProducer << endl;
cout << "Messages consumed: " << responses.size() << endl;
}

// Test case 3: High Volume Stress Test (Using Processes)
void TestCase3WithProcesses() {
cout << "Running TestCase3: High Volume Stress Test (Using Processes)" << endl;

vector<string> bootstrap_servers = {"localhost:8080"};
vector<Producer> producers;
ConsumerGroup consumerGroup("tag3", "group3");

// Adding consumers
for (int i = 0; i < 5; ++i) {
consumerGroup.AddConsumer(bootstrap_servers, "consumer" + to_string(i), {"test_topic"}, {i}, {0});
}

const int messagesPerProducer = 100000;
const int numProducers = 10;

// Fork processes for producers
for (int i = 0; i < numProducers; ++i) {
pid_t pid = fork();
if (pid == 0) { // Child process
Producer producer(bootstrap_servers, 1000, 500, "producer" + to_string(i));
for (int j = 0; j < messagesPerProducer; ++j) {
producer.ProduceMessage("key" + to_string(i) + "_" + to_string(j),
GenerateRandomString(20), "test_topic");
}
exit(0); // Exit the child process
} else if (pid < 0) {
cerr << "Error forking process for producer " << i << endl;
exit(1);
}
}

// Wait for all child processes to finish
for (int i = 0; i < numProducers; ++i) {
int status;
wait(&status);
}

cout << "All producers completed." << endl;

// Simulate consumers fetching messages
auto startTime = chrono::high_resolution_clock::now();

vector<MessageResponse> responses = consumerGroup.ConsumeMessage("test_topic", 0, numProducers * messagesPerProducer);

auto endTime = chrono::high_resolution_clock::now();
chrono::duration<double> elapsed = endTime - startTime;

cout << "TestCase3 Results: " << endl;
cout << "Total messages consumed: " << responses.size() << endl;
cout << "Elapsed time: " << elapsed.count() << " seconds" << endl;
cout << "Throughput: " << (numProducers * messagesPerProducer) / elapsed.count() << " messages/second" << endl;
}

// Main function to execute all test cases
int main() {
TestCase1();
TestCase2WithProcesses();
TestCase3WithProcesses();
return 0;
}
23 changes: 23 additions & 0 deletions test/producer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#ifndef MESSAGE_QUEUE_PRODUCER_H
#define MESSAGE_QUEUE_PRODUCER_H

#include <string>
#include <vector>
#include <memory>

class Producer {
public:
// Constructor to initialize producer with bootstrap servers
Producer(const std::vector<std::string>& bootstrap_servers, int flush_threshold, int flush_interval_ms, const std::string& producer_id);

~Producer(); // Declare the destructor

// Produces a message to the message queue
bool ProduceMessage(const std::string& key, const std::string& value, const std::string& topic);

private:
class Impl; // Forward declaration of the implementation class
std::unique_ptr<Impl> impl_; // Pointer to the implementation class
};

#endif // MESSAGE_QUEUE_PRODUCER_H