Skip to content

feat(multi-threading): Code changed in example3 and example4 #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added bin/example1
Binary file not shown.
Binary file added bin/example2
Binary file not shown.
Binary file added bin/example3
Binary file not shown.
Binary file added bin/example4
Binary file not shown.
100 changes: 100 additions & 0 deletions example/best_buy.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <cstring>
#include <unordered_set>
#include <chrono>
#include "../src/RecordLoader.h"
#include "../src/BitmapIterator.h"
#include "../src/BitmapConstructor.h"

using namespace std;

// Define a mutex for thread-safe appending to the output string
mutex counts_mutex;

// Map to store the counts for each JSONPath query
unordered_map<string, int> counts;


// The query function processes the JSON records according to the specified keys
void query(BitmapIterator* iter) {
if (iter->isObject()) {
// Extracting the "categoryPath" IDs as per the BB JSONPath Query
if (iter->moveToKey("categoryPath")) {
if (iter->down() && iter->isArray()) {
for (int idx = 1; idx <= 2; idx++) { // Adjusted to get the 2nd and 3rd elements
if (iter->moveToIndex(idx)) {
if (iter->down() && iter->isObject() && iter->moveToKey("id")) {
counts_mutex.lock();
counts["BB"]++; // Increment the count for the "BB" query
counts_mutex.unlock();

}
iter->up();
}
}
iter->up();
}
}
}
}


// Function to process a subset of records
void process_records(RecordSet* record_set, int start, int end) {
for (int i = start; i < end; i++) {
Bitmap* bm = BitmapConstructor::construct((*record_set)[i], 1, 3);
BitmapIterator* iter = BitmapConstructor::getIterator(bm);
query(iter);
delete iter;
delete bm;
}
}

int main() {
// Initialize counts map with all query IDs set to 0
counts["BB"] = 0;

char* file_path = "../dataset/bestbuy_small_records.json";
RecordSet* record_set = RecordLoader::loadRecords(file_path);
if (record_set->size() == 0) {
cout << "Record loading failed." << endl;
return -1;
}

int thread_num = std::thread::hardware_concurrency(); // Use as many threads as there are CPU cores
vector<thread> threads;

// Start the timer
auto start_time = chrono::high_resolution_clock::now();

// Calculate the number of records each thread should process
int num_recs_per_thread = record_set->size() / thread_num;
for (int i = 0; i < thread_num; i++) {
int start = i * num_recs_per_thread;
int end = (i == thread_num - 1) ? record_set->size() : (i + 1) * num_recs_per_thread;
threads.emplace_back(process_records, record_set, start, end);
}

// Wait for all threads to finish
for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}

// Stop the timer
auto end_time = chrono::high_resolution_clock::now();
chrono::duration<double, milli> elapsed_time = end_time - start_time;

delete record_set;

// Output the counts in the specified format
cout << "ID\tJSONPath Query\tNumber of Matches" << endl;
cout << "BB\t{$.categoryPath[1:3].id}\t" << counts["BB"] << endl;
cout << "Execution time: " << elapsed_time.count() << " ms" << endl;

return 0;
}
72 changes: 48 additions & 24 deletions example/example3.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,19 @@
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <cstring>
#include <unordered_set>
#include "../src/RecordLoader.h"
#include "../src/BitmapIterator.h"
#include "../src/BitmapConstructor.h"

// {$.user.id, $.retweet_count}
using namespace std;

// Define a mutex for thread-safe appending to the output string
mutex output_mutex;

// The query function processes the JSON records according to the specified keys
string query(BitmapIterator* iter) {
string output = "";
if (iter->isObject()) {
@@ -31,38 +42,51 @@ string query(BitmapIterator* iter) {
return output;
}

// Function to process a subset of records
void process_records(RecordSet* record_set, int start, int end, string& output) {
for (int i = start; i < end; i++) {
Bitmap* bm = BitmapConstructor::construct((*record_set)[i], 1, 2); // Each thread uses 1 thread internally
BitmapIterator* iter = BitmapConstructor::getIterator(bm);
string local_output = query(iter);
delete iter;
delete bm;

// Lock the mutex before appending to the shared output string
output_mutex.lock();
output.append(local_output);
output_mutex.unlock();
}
}

int main() {
char* file_path = "../dataset/twitter_sample_small_records.json";
RecordSet* record_set = RecordLoader::loadRecords(file_path);
if (record_set->size() == 0) {
cout<<"record loading fails."<<endl;
cout << "record loading fails." << endl;
return -1;
}
string output = "";

// fix the number of threads to 1 for small records scenario; parallel bitmap construction is TBD.
int thread_num = 1;

/* set the number of levels of bitmaps to create, either based on the
* query or the JSON records. E.g., query $[*].user.id needs three levels
* (level 0, 1, 2), but the record may be of more than three levels
*/
int level_num = 2;

/* process the records one by one: for each one, first build bitmap, then perform
* the query with a bitmap iterator
*/
int num_recs = record_set->size();
Bitmap* bm = NULL;
for (int i = 0; i < num_recs; i++) {
bm = BitmapConstructor::construct((*record_set)[i], thread_num, level_num);
BitmapIterator* iter = BitmapConstructor::getIterator(bm);
output.append(query(iter));
delete iter;
int thread_num = std::thread::hardware_concurrency(); // Use as many threads as there are CPU cores
std::vector<std::thread> threads;

// Calculate the number of records each thread should process
int num_recs_per_thread = record_set->size() / thread_num;
for (int i = 0; i < thread_num; i++) {
int start = i * num_recs_per_thread;
int end = (i == thread_num - 1) ? record_set->size() : (i + 1) * num_recs_per_thread;
threads.emplace_back(process_records, record_set, start, end, ref(output));
}

// Wait for all threads to finish
for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}
delete bm;

delete record_set;
cout<<"matches are: "<<output<<endl;

cout << "matches are: " << output << endl;
return 0;
}
70 changes: 47 additions & 23 deletions example/example4.cpp
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <cstring>
#include <unordered_set>
#include "../src/RecordLoader.h"
#include "../src/BitmapIterator.h"
#include "../src/BitmapConstructor.h"

// $.categoryPath[1:3].id
using namespace std;

// Define a mutex for thread-safe appending to the output string
mutex output_mutex;

// The query function processes the JSON records according to the specified keys
string query(BitmapIterator* iter) {
string output = "";
if (iter->isObject() && iter->moveToKey("categoryPath")) {
if (iter->down() == false) return output; /* value of "categoryPath" */
if (iter->down() == false) return output; // value of "categoryPath"
if (iter->isArray()) {
for (int idx = 1; idx <= 2; ++idx) {
// 2nd and 3rd elements inside "categoryPath" array
@@ -27,38 +38,51 @@ string query(BitmapIterator* iter) {
return output;
}

// Function to process a subset of records
void process_records(RecordSet* record_set, int start, int end, string& output) {
for (int i = start; i < end; i++) {
Bitmap* bm = BitmapConstructor::construct((*record_set)[i], 1, 3); // Each thread uses 1 thread internally
BitmapIterator* iter = BitmapConstructor::getIterator(bm);
string local_output = query(iter);
delete iter;
delete bm;

// Lock the mutex before appending to the shared output string
output_mutex.lock();
output.append(local_output);
output_mutex.unlock();
}
}

int main() {
char* file_path = "../dataset/bestbuy_sample_small_records.json";
RecordSet* record_set = RecordLoader::loadRecords(file_path);
if (record_set->size() == 0) {
cout<<"record loading fails."<<endl;
cout << "record loading fails." << endl;
return -1;
}
string output = "";

int thread_num = std::thread::hardware_concurrency(); // Use as many threads as there are CPU cores
std::vector<std::thread> threads;

// fix the number of threads to 1 for small records scenario; parallel bitmap construction is TBD.
int thread_num = 1;

/* set the number of levels of bitmaps to create, either based on the
* query or the JSON records. E.g., query $[*].user.id needs three levels
* (level 0, 1, 2), but the record may be of more than three levels
*/
int level_num = 3;
// Calculate the number of records each thread should process
int num_recs_per_thread = record_set->size() / thread_num;
for (int i = 0; i < thread_num; i++) {
int start = i * num_recs_per_thread;
int end = (i == thread_num - 1) ? record_set->size() : (i + 1) * num_recs_per_thread;
threads.emplace_back(process_records, record_set, start, end, ref(output));
}

/* process the records one by one: for each one, first build bitmap, then perform
* the query with a bitmap iterator
*/
int num_recs = record_set->size();
Bitmap* bm = NULL;
for (int i = 0; i < num_recs; i++) {
bm = BitmapConstructor::construct((*record_set)[i], thread_num, level_num);
BitmapIterator* iter = BitmapConstructor::getIterator(bm);
output.append(query(iter));
delete iter;
// Wait for all threads to finish
for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}
delete bm;

delete record_set;

cout<<"matches are: "<<output<<endl;
cout << "matches are: " << output << endl;
return 0;
}
100 changes: 100 additions & 0 deletions example/twitter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <cstring>
#include <unordered_set>
#include <chrono>
#include "../src/RecordLoader.h"
#include "../src/BitmapIterator.h"
#include "../src/BitmapConstructor.h"

using namespace std;

// Define a mutex for thread-safe appending to the output string
mutex counts_mutex;

// Map to store the counts for each JSONPath query
unordered_map<string, int> counts;


// The query function processes the JSON records according to the specified keys
void query(BitmapIterator* iter) {
if (iter->isObject()) {

if (iter->moveToKey("user")) {
if (iter->down()) {
if (iter->moveToKey("lang")) {
counts_mutex.lock();
counts["TT"]++; // Increment the count for the "TT" query for $.user.lang
counts_mutex.unlock();

}
iter->up();
}
}
if (iter->moveToKey("lang")) {
counts_mutex.lock();
counts["TT"]++; // Increment the count for the "TT" query for $.lang
counts_mutex.unlock();
}
}
}


// Function to process a subset of records
void process_records(RecordSet* record_set, int start, int end) {
for (int i = start; i < end; i++) {
Bitmap* bm = BitmapConstructor::construct((*record_set)[i], 1, 3);
BitmapIterator* iter = BitmapConstructor::getIterator(bm);
query(iter);
delete iter;
delete bm;
}
}

int main() {
// Initialize counts map with all query IDs set to 0
counts["TT"] = 0;

char* file_path = "../dataset/twitter_small_records.json";
RecordSet* record_set = RecordLoader::loadRecords(file_path);
if (record_set->size() == 0) {
cout << "Record loading failed." << endl;
return -1;
}

int thread_num = std::thread::hardware_concurrency(); // Use as many threads as there are CPU cores
vector<thread> threads;

// Calculate the number of records each thread should process
int num_recs_per_thread = record_set->size() / thread_num;
for (int i = 0; i < thread_num; i++) {
int start = i * num_recs_per_thread;
int end = (i == thread_num - 1) ? record_set->size() : (i + 1) * num_recs_per_thread;
threads.emplace_back(process_records, record_set, start, end);
}

// Start the timer
auto start_time = chrono::high_resolution_clock::now();

// Wait for all threads to finish
for (auto& t : threads) {
if (t.joinable()) {
t.join();
}
}

// Stop the timer
auto end_time = chrono::high_resolution_clock::now();
chrono::duration<double, milli> elapsed_time = end_time - start_time;

// Output the counts in the specified format
cout << "ID\tJSONPath Query\tNumber of Matches" << endl;
cout << "TT\t{$.user.lang, $.lang}\t" << counts["TT"] << endl;
cout << "Execution time: " << elapsed_time.count() << " ms" << endl;

delete record_set;

return 0;
}
29 changes: 9 additions & 20 deletions makefile
Original file line number Diff line number Diff line change
@@ -1,34 +1,23 @@
DIR = bin
EXEC1 = $(DIR)/example1
EXEC2 = $(DIR)/example2
EXEC3 = $(DIR)/example3
EXEC4 = $(DIR)/example4
TARGET = $(EXEC1) ${EXEC2} ${EXEC3} ${EXEC4}
\
EXEC4 = $(DIR)/best_buy
EXEC5 = $(DIR)/twitter
TARGET = ${EXEC4} ${EXEC5}
all: $(TARGET)

CC = g++
CC_FLAGS = -O3 -std=c++11 -mavx -mavx2 -msse -msse2 -msse4 -msse4.2 -mpclmul
POST_FLAGS = -lpthread -mcmodel=medium -static-libstdc++

SOURCE1 = src/*.cpp example/example1.cpp
$(EXEC1): $(SOURCE1)
mkdir -p $(DIR)
$(CC) $(CC_FLAGS) -o $(EXEC1) $(SOURCE1) $(POST_FLAGS)

SOURCE2 = src/*.cpp example/example2.cpp
$(EXEC2): $(SOURCE2)
mkdir -p $(DIR)
$(CC) $(CC_FLAGS) -o $(EXEC2) $(SOURCE2) $(POST_FLAGS)

SOURCE3 = src/*.cpp example/example3.cpp
$(EXEC3): $(SOURCE3)
mkdir -p $(DIR)
$(CC) $(CC_FLAGS) -o $(EXEC3) $(SOURCE3) $(POST_FLAGS)

SOURCE4 = src/*.cpp example/example4.cpp
SOURCE4 = src/*.cpp example/best_buy.cpp
$(EXEC4): $(SOURCE4)
mkdir -p $(DIR)
$(CC) $(CC_FLAGS) -o $(EXEC4) $(SOURCE4) $(POST_FLAGS)
SOURCE5 = src/*.cpp example/twitter.cpp
$(EXEC5): $(SOURCE5)
mkdir -p $(DIR)
$(CC) $(CC_FLAGS) -o $(EXEC5) $(SOURCE5) $(POST_FLAGS)

clean:
-$(RM) $(TARGET)
1 change: 1 addition & 0 deletions src/LocalBitmap.cpp
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@
#include <sys/file.h>
#include <unistd.h>
#include <sched.h>
#include <cstdint>
#include <unordered_map>
using namespace std;

1 change: 1 addition & 0 deletions src/SerialBitmap.cpp
Original file line number Diff line number Diff line change
@@ -19,6 +19,7 @@
#include <unistd.h>
#include <sched.h>
#include <unordered_map>
#include <cstdint>
using namespace std;

SerialBitmap::SerialBitmap() {