Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions src/stats/stats_printer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
#include "vma/util/utils.h"
#include "vma/util/vma_stats.h"
#include "vma/lwip/tcp.h"
#include "vma/vma_extra.h"
#include "vma/util/sys_vars.h"
#include "vma/util/vtypes.h"

typedef enum {
e_K = 1024,
Expand Down
12 changes: 9 additions & 3 deletions src/stats/stats_publisher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ void stats_data_reader::handle_timer_expired(void *ctx)

void stats_data_reader::register_to_timer()
{
m_timer_handler = g_p_event_handler_manager->register_timer_event(STATS_PUBLISHER_TIMER_PERIOD, g_p_stats_data_reader, PERIODIC_TIMER, 0);
m_timer_handler = g_p_event_handler_manager->register_timer_event(STATS_PUBLISHER_TIMER_PERIOD, this, PERIODIC_TIMER, 0);
}

void stats_data_reader::add_data_reader(void* local_addr, void* shm_addr, int size)
Expand Down Expand Up @@ -150,6 +150,8 @@ void vma_shmem_stats_open(vlog_levels_t** p_p_vma_log_level, uint8_t** p_p_vma_l
}
BULLSEYE_EXCLUDE_BLOCK_END

vlog_printf(VLOG_DEBUG,"%s:%d: Allocated g_p_stats_data_reader pointer as '%p'\n", __func__, __LINE__, g_p_stats_data_reader);

shmem_size = SHMEM_STATS_SIZE(safe_mce_sys().stats_fd_num_max);
buf = malloc(shmem_size);
if (buf == NULL)
Expand Down Expand Up @@ -277,8 +279,12 @@ void vma_shmem_stats_close()
g_sh_mem = NULL;
g_p_vlogger_level = NULL;
g_p_vlogger_details = NULL;
delete g_p_stats_data_reader;
g_p_stats_data_reader = NULL;
if (g_p_stats_data_reader != NULL) {
stats_data_reader* p = g_p_stats_data_reader;
g_p_stats_data_reader = NULL;
p->set_destroying_state(true);
delete p;
}
}

void vma_stats_instance_create_socket_block(socket_stats_t* local_stats_addr)
Expand Down
2 changes: 1 addition & 1 deletion src/stats/stats_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
#include <fcntl.h>
#include <errno.h>
#include <list>
#include <netinet/in.h>
#include <signal.h>
#include <getopt.h> /* getopt()*/
#include <errno.h>
#include <dirent.h>
#include <string.h>
#include <vector>
Expand Down
1 change: 1 addition & 0 deletions src/vma/dev/time_converter_ptp.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#define TIME_CONVERTER_PTP_H

#include <infiniband/verbs.h>
#include "vma/ib/base/verbs_extra.h"
#include "vma/event/timer_handler.h"
#include <vma/util/sys_vars.h>
#include "time_converter.h"
Expand Down
38 changes: 15 additions & 23 deletions src/vma/event/delta_timer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
using namespace std::chrono;

#define IS_NODE_INVALID(_node_) \
(!_node_ || !_node_->handler || (_node_->req_type < 0 || _node_->req_type >= INVALID_TIMER))
(!_node_ || !_node_->handler.load() || (_node_->req_type < 0 || _node_->req_type >= INVALID_TIMER))


timer::timer()
Expand All @@ -54,7 +54,7 @@ timer::~timer()

void timer::add_new_timer(unsigned int timeout_msec, timer_node_t* node, timer_handler* handler, void* user_data, timer_req_type_t req_type)
{
node->handler = handler;
node->handler.store(handler);
node->req_type = req_type;
node->user_data = user_data;
node->orig_time_msec = milliseconds(timeout_msec);
Expand Down Expand Up @@ -94,22 +94,22 @@ void timer::remove_timer(timer_node_t* node, timer_handler *handler)
if (!node) {
node = m_list_head;
while (node) {
if (node->handler == handler) // node found
if (node->handler.load() == handler) // node found
break;
node = node->next;
}
}

// Here we MUST have a valid node pointer
BULLSEYE_EXCLUDE_BLOCK_START
if (IS_NODE_INVALID(node) || (node->handler != handler)) {
if (IS_NODE_INVALID(node) || (node->handler.load() != handler)) {
tmr_logfunc("bad <node,handler> combo for removale (%p,%p)", node, handler);
return;
}
BULLSEYE_EXCLUDE_BLOCK_END

// Invalidate node before freeing it
node->handler = NULL;
node->handler.store(nullptr);
node->req_type = INVALID_TIMER;

// Remove & Free node
Expand All @@ -125,18 +125,18 @@ void timer::remove_all_timers(timer_handler *handler)

// Look for handler in the list if node wasen't indicated
while (node) {
if (node->handler == handler) {// node found
if (node->handler.load() == handler) {// node found
node_tmp = node;
node = node->next;
// Here we MUST have a valid node pointer
BULLSEYE_EXCLUDE_BLOCK_START
if (IS_NODE_INVALID(node_tmp) || (node_tmp->handler != handler)) {
if (IS_NODE_INVALID(node_tmp) || (node_tmp->handler.load() != handler)) {
tmr_logfunc("bad <node,handler> combo for removale (%p,%p)", node_tmp, handler);
continue;
}
BULLSEYE_EXCLUDE_BLOCK_END
// Invalidate node before freeing it
node_tmp->handler = NULL;
node_tmp->handler.store(nullptr);
node_tmp->req_type = INVALID_TIMER;
remove_from_list(node_tmp);
// Remove & Free node
Expand Down Expand Up @@ -200,19 +200,11 @@ void timer::process_registered_timers()
timer_node_t* iter = m_list_head;
timer_node_t* next_iter;
while (iter && (iter->delta_time_msec == milliseconds(0))) {
tmr_logfuncall("timer expired on %p", iter->handler);

/* Special check is need to protect
* from using destroyed object pointed by handler
* See unregister_timer_event()
* Object can be destoyed from another thread (lock protection)
* and from current thread (lock and lock count condition)
*/
if (iter->handler &&
!iter->lock_timer.trylock() &&
(1 == iter->lock_timer.is_locked_by_me())) {
iter->handler->handle_timer_expired(iter->user_data);
iter->lock_timer.unlock();
timer_handler * handler = iter->handler.load();
tmr_logfuncall("timer expired on %p", handler);

if (handler) {
handler->safe_handle_timer_expired(iter->user_data);
}
next_iter = iter->next;

Expand All @@ -225,13 +217,13 @@ void timer::process_registered_timers()
break;

case ONE_SHOT_TIMER:
remove_timer(iter, iter->handler);
remove_timer(iter, handler);
break;

BULLSEYE_EXCLUDE_BLOCK_START
case INVALID_TIMER:
default:
tmr_logwarn("invalid timer expired on %p", iter->handler);
tmr_logwarn("invalid timer expired on %p", handler);
break;
}
BULLSEYE_EXCLUDE_BLOCK_END
Expand Down
18 changes: 7 additions & 11 deletions src/vma/event/delta_timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
#ifndef DELTA_TIMER_H
#define DELTA_TIMER_H

#include <atomic>
#include <chrono>
#include "utils/lock_wrapper.h"

Expand All @@ -32,18 +33,13 @@ struct timer_node_t {
std::chrono::milliseconds delta_time_msec;
/* the orig timer requested (saved in order to re-register periodic timers) */
std::chrono::milliseconds orig_time_msec;
/* control thread-safe access to handler. Recursive because unregister_timer_event()
* can be called from handle_timer_expired()
* that is under trylock() inside process_registered_timers
*/
lock_spin_recursive lock_timer;
/* link to the context registered */
timer_handler* handler;
void* user_data;
timers_group* group;
timer_req_type_t req_type;
struct timer_node_t* next;
struct timer_node_t* prev;
std::atomic<timer_handler*> handler;
void* user_data;
timers_group* group;
timer_req_type_t req_type;
struct timer_node_t* next;
struct timer_node_t* prev;
}; // used by the list

class timer
Expand Down
28 changes: 7 additions & 21 deletions src/vma/event/event_handler_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ void* event_handler_manager::register_timer_event(int timeout_msec, timer_handle
}
BULLSEYE_EXCLUDE_BLOCK_END

timer_node_t* timer_node = (timer_node_t*)node;
timer_node->lock_timer=lock_spin_recursive("timer");

reg_action_t reg_action;
memset(&reg_action, 0, sizeof(reg_action));
reg_action.type = REGISTER_TIMER;
Expand Down Expand Up @@ -116,30 +113,19 @@ void event_handler_manager::unregister_timer_event(timer_handler* handler, void*
reg_action.type = UNREGISTER_TIMER;
reg_action.info.timer.handler = handler;
reg_action.info.timer.node = node;

/* Special protection is needed to avoid scenario when deregistration is done
* during timer_handler object destruction, timer node itself is not removed
* and time for this timer node is expired. In this case there is no guarantee
* to operate with timer_handler object.
* See timer::process_registered_timers()
* Do just lock() to protect timer_handler inside process_registered_timers()
*/
if (node) {
timer_node_t* timer_node = (timer_node_t*)node;
timer_node->lock_timer.lock();
}

post_new_reg_action(reg_action);
}

void event_handler_manager::unregister_timers_event_and_delete(timer_handler* handler)
{
evh_logdbg("timer handler '%p'", handler);
reg_action_t reg_action;
memset(&reg_action, 0, sizeof(reg_action));
reg_action.type = UNREGISTER_TIMERS_AND_DELETE;
reg_action.info.timer.handler = handler;
post_new_reg_action(reg_action);
if( handler != nullptr && !handler->set_destroying_state()) {
reg_action_t reg_action;
memset(&reg_action, 0, sizeof(reg_action));
reg_action.type = UNREGISTER_TIMERS_AND_DELETE;
reg_action.info.timer.handler = handler;
post_new_reg_action(reg_action);
}
}

void event_handler_manager::register_ibverbs_event(int fd, event_handler_ibverbs *handler,
Expand Down
58 changes: 56 additions & 2 deletions src/vma/event/timer_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,70 @@
#ifndef TIMER_HANDLER_H
#define TIMER_HANDLER_H

#include <atomic>

#include "vlogger/vlogger.h"
#include "utils/lock_wrapper.h"

/**
* simple timer notification.
* Any class that inherit timer_handler should also inherit cleanable_obj, and use clean_obj instead of delete.
* It must implement the clean_obj method to delete the object from the internal thread.
*/
class timer_handler
{
public:
virtual ~timer_handler() {};
private:
lock_spin m_handle_mutex{"timer_handler"};
std::atomic<bool> m_destroy_in_progress{false};
protected:
virtual void handle_timer_expired(void* user_data) = 0;

public:
timer_handler() = default;

virtual ~timer_handler() {
if( !m_destroy_in_progress.load()) {
m_destroy_in_progress.store(true);
vlog_printf(VLOG_DEBUG, "Destroying timer_handler without destroy in progress.\n");
}
{
m_handle_mutex.lock();
m_handle_mutex.unlock();
}
}

void safe_handle_timer_expired(void* user_data) {
if (m_handle_mutex.trylock() == 0) {
if(!m_destroy_in_progress.load()) {
handle_timer_expired(user_data);
}
m_handle_mutex.unlock();
}
}

/**
* Sets the destroying state of the object to indicate that destruction is in progress.
*
* If `wait_for_handler` is set to true, this method will ensure that the mutex
* used for handling operations (m_handle_mutex) is acquired and released, effectively
* waiting for any pending handler operation to complete.
*
* @param wait_for_handler If true, waits for the handler mutex to ensure
* handler finish before proceeding.
* Defaults to false.
*
* @return The previous state of the destruction flag.
* Returns true if a destruction process was already in progress before
* this method was called, otherwise false.
*/
bool set_destroying_state(bool wait_for_handler = false) {
bool result = m_destroy_in_progress.exchange(true);
if( wait_for_handler ) {
m_handle_mutex.lock();
m_handle_mutex.unlock();
}
return result;
}
};

#endif
2 changes: 1 addition & 1 deletion src/vma/sock/fd_collection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -635,7 +635,7 @@ void fd_collection::handle_timer_expired(void* user_data)
if (si_tcp) {
//In case of TCP socket progress the TCP connection
fdcoll_logfunc("Call to handler timer of TCP socket:%d", (*itr)->get_fd());
si_tcp->handle_timer_expired(NULL);
si_tcp->safe_handle_timer_expired(NULL);
}
itr++;
}
Expand Down
12 changes: 8 additions & 4 deletions src/vma/sock/sockinfo_tcp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4798,8 +4798,12 @@ void tcp_timers_collection::handle_timer_expired(void* user_data)
NOT_IN_USE(user_data);
timer_node_t* iter = m_p_intervals[m_n_location];
while (iter) {
__log_funcall("timer expired on %p", iter->handler);
iter->handler->handle_timer_expired(iter->user_data);
timer_handler * handler = iter->handler.load();
__log_funcall("timer expired on %p", handler);
if ( handler )
{
handler->safe_handle_timer_expired(iter->user_data);
}
iter = iter->next;
}
m_n_location = (m_n_location + 1) % m_n_intervals_size;
Expand All @@ -4810,7 +4814,7 @@ void tcp_timers_collection::handle_timer_expired(void* user_data)

void tcp_timers_collection::add_new_timer(timer_node_t* node, timer_handler* handler, void* user_data)
{
node->handler = handler;
node->handler.store(handler);
node->user_data = user_data;
node->group = this;
node->next = NULL;
Expand Down Expand Up @@ -4859,7 +4863,7 @@ void tcp_timers_collection::remove_timer(timer_node_t* node)
}
}

__log_dbg("TCP timer handler [%p] was removed", node->handler);
__log_dbg("TCP timer handler [%p] was removed", node->handler.load());

free(node);
}
Expand Down