Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,8 @@
[submodule "third-party/eRPC-fork"]
path = third-party/eRPC-fork
url = https://github.com/daq-db/eRPC.git
ignore = dirty
[submodule "third-party/dpdk"]
path = third-party/dpdk
url = https://github.com/DPDK/dpdk.git
ignore = dirty
31 changes: 24 additions & 7 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -73,16 +73,33 @@ add_subdirectory(${3RDPARTY})

include_directories(${LIBCONFIG_INCLUDES_EXPORT})
include_directories(${3RDPARTY}/eRPC-fork/src)
include_directories(${3RDPARTY}/spdk/dpdk/build/include)

file(GLOB_RECURSE DAQDB_THIN_SOURCES lib/thin/*.cpp lib/common/*.cpp
lib/dht/*.cpp lib/KVStoreBase.cpp)
list(FILTER DAQDB_THIN_SOURCES EXCLUDE REGEX "lib/dht/.*Server.*\.cpp$")
include_directories(${3RDPARTY}/dpdk/build/include)
include_directories(lib/common lib/dht lib/thin)
include_directories(${HDRHISTOGRAM_INCLUDES_EXPORT})
add_library(daqdb_thin SHARED ${DAQDB_THIN_SOURCES})

if(ERPC_RAW_TRANSPORT)
file(GLOB_RECURSE DAQDB_THIN_SOURCES lib/thin/*.cpp lib/common/*.cpp
lib/dht/*.cpp lib/KVStoreBase.cpp)
list(FILTER DAQDB_THIN_SOURCES EXCLUDE REGEX "lib/dht/.*Server.*\.cpp$")

add_library(daqdb_thin SHARED ${DAQDB_THIN_SOURCES})
target_link_libraries(daqdb_thin ${Boost_LIBRARIES} libconfig ${ERPC_LIBS})

else(ERPC_RAW_TRANSPORT)
include_directories(lib/offload lib/pmem lib/core lib/spdk lib/primary
lib/spdk ${3RDPARTY}/spdk/include)

file(GLOB_RECURSE DAQDB_THIN_SOURCES lib/thin/*.cpp lib/common/*.cpp
lib/dht/*.cpp lib/KVStoreBase.cpp lib/spdk/*.cpp)
list(FILTER DAQDB_THIN_SOURCES EXCLUDE REGEX "lib/dht/.*Server.*\.cpp$")
set(Spdk_LIBRARIES -Wl,--whole-archive spdk -Wl,--no-whole-archive pthread
rt uuid)

add_library(daqdb_thin SHARED ${DAQDB_THIN_SOURCES})
target_link_libraries(daqdb_thin ${Spdk_LIBRARIES} ${Boost_LIBRARIES} libconfig ${ERPC_LIBS})
endif(ERPC_RAW_TRANSPORT)

target_compile_definitions(daqdb_thin PRIVATE THIN_LIB=1)
target_link_libraries(daqdb_thin ${Boost_LIBRARIES} libconfig ${ERPC_LIBS})

if(NOT THIN_LIB)
include_directories(${3RDPARTY}/pmdk/src/include)
Expand Down
3 changes: 2 additions & 1 deletion apps/minidaq/MinidaqAroNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/

#include "MinidaqAroNode.h"
#include <libpmem.h>

namespace DaqDB {

Expand All @@ -35,7 +36,7 @@ void MinidaqAroNode::_Task(Key &&key, std::atomic<std::uint64_t> &cnt,
throw;
}

memcpy(value.data(), _data_buffer, value.size());
pmem_memcpy_nodrain(value.data(), _data_buffer, value.size());

#ifdef WITH_INTEGRITY_CHECK
_FillBuffer(key, value.data(), value.size());
Expand Down
6 changes: 2 additions & 4 deletions apps/minidaq/MinidaqFfNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* limitations under the License.
*/

#include "MinidaqFfNode.h"
Expand Down Expand Up @@ -80,11 +80,9 @@ void MinidaqFfNode::_Task(Key &&key, std::atomic<std::uint64_t> &cnt,
int baseId = _PickSubdetector();
bool accept = _Accept();

mKeyPtr->runId = _runId;

for (int i = 0; i < _PickNFragments(); i++) {
/** @todo change to GetRange once implemented */
mKeyPtr->subdetectorId = baseId + i;
mKeyPtr->componentId = baseId + i;
DaqDB::Value value;
try {
value = _kvs->Get(key);
Expand Down
12 changes: 6 additions & 6 deletions apps/minidaq/MinidaqFfNodeSeq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* limitations under the License.
*/

#include "MinidaqFfNodeSeq.h"

namespace DaqDB {

thread_local int MinidaqFfNodeSeq::_eventId;
thread_local uint64_t MinidaqFfNodeSeq::_eventId;

MinidaqFfNodeSeq::MinidaqFfNodeSeq(KVStoreBase *kvs) : MinidaqFfNode(kvs) {}

Expand All @@ -35,9 +35,9 @@ Key MinidaqFfNodeSeq::_NextKey() {
? AllocOptions(KeyValAttribute::NOT_BUFFERED)
: AllocOptions(KeyValAttribute::KVS_BUFFERED));
MinidaqKey *mKeyPtr = reinterpret_cast<MinidaqKey *>(key.data());
mKeyPtr->runId = _runId;
mKeyPtr->subdetectorId = _baseId;
mKeyPtr->eventId = _eventId;
mKeyPtr->detectorId = 0;
mKeyPtr->componentId = _baseId;
memcpy(&mKeyPtr->eventId, &_eventId, sizeof(mKeyPtr->eventId));
_eventId += _nTh;
return key;
}
Expand All @@ -51,7 +51,7 @@ void MinidaqFfNodeSeq::_Task(Key &&key, std::atomic<std::uint64_t> &cnt,

for (int i = 0; i < _PickNFragments(); i++) {
/** @todo change to GetRange once implemented */
mKeyPtr->subdetectorId = baseId + i;
mKeyPtr->componentId = baseId + i;
DaqDB::Value value;
nRetries = 0;
while (nRetries < _maxRetries) {
Expand Down
6 changes: 4 additions & 2 deletions apps/minidaq/MinidaqFfNodeSeq.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* limitations under the License.
*/

#pragma once
Expand All @@ -32,6 +32,8 @@ class MinidaqFfNodeSeq : public MinidaqFfNode {
Key _NextKey();
std::string _GetType();

static thread_local int _eventId;
static thread_local uint64_t _eventId;
static_assert(sizeof(_eventId) >= sizeof(MinidaqKey().eventId),
"Event Id field of MinidaqKey is too big");
};
}
17 changes: 13 additions & 4 deletions apps/minidaq/MinidaqNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* limitations under the License.
*/

#include <atomic>
Expand Down Expand Up @@ -261,7 +261,9 @@ void MinidaqNode::SaveSummary(std::string &fs, std::string &tname) {
char MinidaqNode::_GetBufferByte(const Key &key, size_t i) {
const MinidaqKey *mKeyPtr =
reinterpret_cast<const MinidaqKey *>(key.data());
return ((mKeyPtr->eventId + i) % 256);
const char *eventId = reinterpret_cast<const char *>(&mKeyPtr->eventId);
return *(eventId +
(i % (sizeof(mKeyPtr->eventId) / sizeof(mKeyPtr->eventId[0]))));
}

void MinidaqNode::_FillBuffer(const Key &key, char *buf, size_t s) {
Expand All @@ -286,8 +288,15 @@ bool MinidaqNode::_CheckBuffer(const Key &key, const char *buf, size_t s) {
if (!err) {
err = true;
msg << "Integrity check failed (" << _GetType()
<< ") EventId=" << mKeyPtr->eventId
<< " SubdetectorId=" << mKeyPtr->subdetectorId << std::endl;
<< ") EventId=0x";
for (int j = (sizeof(mKeyPtr->eventId) /
sizeof(mKeyPtr->eventId[0])) -
1;
j >= 0; j--)
msg << std::hex
<< static_cast<unsigned int>(mKeyPtr->eventId[j]);
msg << std::dec << " SubdetectorId=" << mKeyPtr->componentId
<< std::endl;
_nIntegrityErrors++;
}
msg << " buf[" << i << "] = "
Expand Down
12 changes: 4 additions & 8 deletions apps/minidaq/MinidaqNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* limitations under the License.
*/

#pragma once
Expand All @@ -27,12 +27,9 @@
namespace DaqDB {

struct __attribute__((packed)) MinidaqKey {
MinidaqKey() : eventId(0), subdetectorId(0), runId(0){};
MinidaqKey(uint64_t e, uint16_t s, uint16_t r)
: eventId(e), subdetectorId(s), runId(r) {}
uint16_t runId;
uint16_t subdetectorId;
uint64_t eventId;
uint8_t eventId[5];
uint8_t detectorId;
uint16_t componentId;
};

class MinidaqNode {
Expand Down Expand Up @@ -72,7 +69,6 @@ class MinidaqNode {
virtual std::string _GetType() = 0;

KVStoreBase *_kvs;
int _runId = 599;
int _nTh = 1; // number of worker threads
bool _localOnly = false; // single-node benchmark
#ifdef WITH_INTEGRITY_CHECK
Expand Down
15 changes: 8 additions & 7 deletions apps/minidaq/MinidaqRoNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,16 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* limitations under the License.
*/

#include "MinidaqRoNode.h"
#include <libpmem.h>

namespace DaqDB {

thread_local int MinidaqRoNode::_eventId;
thread_local constexpr char MinidaqRoNode::_data_buffer [100000];
thread_local uint64_t MinidaqRoNode::_eventId;
thread_local constexpr char MinidaqRoNode::_data_buffer[100000];

MinidaqRoNode::MinidaqRoNode(KVStoreBase *kvs) : MinidaqNode(kvs) {}

Expand All @@ -34,9 +35,9 @@ Key MinidaqRoNode::_NextKey() {
? AllocOptions(KeyValAttribute::NOT_BUFFERED)
: AllocOptions(KeyValAttribute::KVS_BUFFERED));
MinidaqKey *mKeyPtr = reinterpret_cast<MinidaqKey *>(key.data());
mKeyPtr->runId = _runId;
mKeyPtr->subdetectorId = _id;
mKeyPtr->eventId = _eventId;
mKeyPtr->detectorId = 0;
mKeyPtr->componentId = _id;
memcpy(&mKeyPtr->eventId, &_eventId, sizeof(mKeyPtr->eventId));
_eventId += _nTh;
return key;
}
Expand All @@ -51,7 +52,7 @@ void MinidaqRoNode::_Task(Key &&key, std::atomic<std::uint64_t> &cnt,
throw;
}

memcpy(value.data(), _data_buffer, value.size());
pmem_memcpy_nodrain(value.data(), _data_buffer, value.size());

#ifdef WITH_INTEGRITY_CHECK
_FillBuffer(key, value.data(), value.size());
Expand Down
6 changes: 4 additions & 2 deletions apps/minidaq/MinidaqRoNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* limitations under the License.
*/

#pragma once
Expand All @@ -37,7 +37,9 @@ class MinidaqRoNode : public MinidaqNode {

size_t _fSize = 0;
int _id = 0;
static thread_local int _eventId;
static thread_local uint64_t _eventId;
static_assert(sizeof(_eventId) >= sizeof(MinidaqKey().eventId),
"Event Id field of MinidaqKey is too big");
static thread_local constexpr char _data_buffer[100000] = " ";
};
}
8 changes: 4 additions & 4 deletions apps/minidaq/minidaq.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* limitations under the License.
*/

#include <boost/filesystem.hpp>
Expand Down Expand Up @@ -108,9 +108,9 @@ static std::unique_ptr<DaqDB::KVStoreBase> openKVS() {
options.pmem.poolPath = pmem_path;
options.pmem.totalSize = pmem_size;
options.pmem.allocUnitSize = fSize;
options.key.field(0, sizeof(DaqDB::MinidaqKey::runId));
options.key.field(1, sizeof(DaqDB::MinidaqKey::subdetectorId));
options.key.field(2, sizeof(DaqDB::MinidaqKey::eventId), true);
options.key.field(0, sizeof(DaqDB::MinidaqKey::eventId), true);
options.key.field(1, sizeof(DaqDB::MinidaqKey::detectorId));
options.key.field(2, sizeof(DaqDB::MinidaqKey::componentId));
options.runtime.numOfPollers = nPoolers;
options.runtime.maxReadyKeys = maxReadyKeys;
if (satellite) {
Expand Down
6 changes: 3 additions & 3 deletions examples/clinode/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ void initKvsOptions(DaqDB::Options &options, const std::string &configFile) {
options.pmem.totalSize = DEFAULT_PMEM_POOL_SIZE;
options.pmem.allocUnitSize = DEFAULT_PMEM_ALLOC_UNIT_SIZE;

options.key.field(0, sizeof(CliNodeKey::runId));
options.key.field(1, sizeof(CliNodeKey::subdetectorId));
options.key.field(2, sizeof(CliNodeKey::eventId), true);
options.key.field(0, sizeof(CliNodeKey::eventId), true);
options.key.field(1, sizeof(CliNodeKey::detectorId));
options.key.field(2, sizeof(CliNodeKey::componentId));

options.offload.allocUnitSize = DEFAULT_OFFLOAD_ALLOC_UNIT_SIZE;

Expand Down
9 changes: 3 additions & 6 deletions examples/clinode/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,9 @@
#include <config/Configuration.h>

struct __attribute__((packed)) CliNodeKey {
CliNodeKey() : eventId(0), subdetectorId(0), runId(0) {};
CliNodeKey(uint64_t e, uint16_t s, uint16_t r)
: eventId(e), subdetectorId(s), runId(r) {}
uint16_t runId;
uint16_t subdetectorId;
uint64_t eventId;
uint8_t eventId[5];
uint8_t detectorId;
uint16_t componentId;
};

void initKvsOptions(DaqDB::Options &options, const std::string &configFile);
4 changes: 1 addition & 3 deletions examples/clinode/nodeCli.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,7 @@ DaqDB::Key nodeCli::_strToKey(const std::string &key) {

DaqDB::Key keyBuff = _spKVStore->AllocKey(KeyValAttribute::NOT_BUFFERED);
CliNodeKey *cliKeyPtr = reinterpret_cast<CliNodeKey *>(keyBuff.data());
cliKeyPtr->runId = 0;
cliKeyPtr->subdetectorId = 0;
cliKeyPtr->eventId = 0;
memset(cliKeyPtr, 0, sizeof(CliNodeKey));

memcpy(&(cliKeyPtr->eventId), key.data(), key.size());
return keyBuff;
Expand Down
9 changes: 8 additions & 1 deletion include/daqdb/Options.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* limitations under the License.
*/

#pragma once
Expand Down Expand Up @@ -110,6 +110,13 @@ struct KeyFieldDescriptor {
struct KeyDescriptor {
size_t nfields() const { return _fields.size(); }

size_t size() const {
size_t size = 0;
for (auto &f : _fields)
size += f.size;
return size;
}

void field(size_t idx, size_t size, bool isPrimary = false) {
if (nfields() <= idx)
_fields.resize(idx + 1);
Expand Down
Loading