diff --git a/src/libvgpu.c b/src/libvgpu.c index 83158cfb..e038b39f 100644 --- a/src/libvgpu.c +++ b/src/libvgpu.c @@ -872,21 +872,30 @@ void preInit(){ void postInit(){ allocator_init(); map_cuda_visible_devices(); - int lock_ret = try_lock_unified_lock(); - if (lock_ret != 0) { - LOG_WARN("try_lock_unified_lock failed, skipping set_task_pid"); - pidfound=0; + + // Use shared memory semaphore to serialize host PID detection + // Returns 1 if lock acquired, 0 if timeout (skip detection) + int lock_acquired = lock_postinit(); + nvmlReturn_t res = NVML_SUCCESS; + + if (lock_acquired) { + // Lock acquired - safe to call set_task_pid() + res = set_task_pid(); + unlock_postinit(); } else { - nvmlReturn_t res = set_task_pid(); - try_unlock_unified_lock(); - if (res != NVML_SUCCESS) { - LOG_WARN("SET_TASK_PID FAILED."); - pidfound = 0; - } else { - pidfound = 1; - } + // Timeout - another process likely crashed holding the lock + // Skip host PID detection for this process + LOG_WARN("Skipped host PID detection due to lock timeout"); + res = NVML_ERROR_TIMEOUT; } + LOG_MSG("Initialized"); + if (res != NVML_SUCCESS) { + LOG_WARN("SET_TASK_PID FAILED - using container PID for accounting"); + pidfound = 0; + } else { + pidfound = 1; + } //add_gpu_device_memory_usage(getpid(),0,context_size,0); env_utilization_switch = set_env_utilization_switch(); diff --git a/src/multiprocess/multiprocess_memory_limit.c b/src/multiprocess/multiprocess_memory_limit.c index 6eaddce4..e4367bb7 100755 --- a/src/multiprocess/multiprocess_memory_limit.c +++ b/src/multiprocess/multiprocess_memory_limit.c @@ -34,6 +34,15 @@ #define SEM_WAIT_RETRY_TIMES 30 #endif +// Longer timeout for postinit since set_task_pid() with adaptive polling can take several seconds +#ifndef SEM_WAIT_TIME_POSTINIT +#define SEM_WAIT_TIME_POSTINIT 30 +#endif + +#ifndef SEM_WAIT_RETRY_TIMES_POSTINIT +#define SEM_WAIT_RETRY_TIMES_POSTINIT 10 +#endif + int pidfound; int ctx_activate[32]; @@ -261,7 +270,7 @@ size_t get_gpu_memory_monitor(const int dev) { return total; } -// Lock-free memory usage aggregation +// Lock-free memory usage aggregation with seqlock for consistent snapshots size_t get_gpu_memory_usage(const int dev) { LOG_INFO("get_gpu_memory_usage_lockfree dev=%d", dev); ensure_initialized(); @@ -271,17 +280,67 @@ size_t get_gpu_memory_usage(const int dev) { // Lock-free read with acquire semantics for proc_num int proc_num = atomic_load_explicit(®ion_info.shared_region->proc_num, memory_order_acquire); - for (i = 0; i < proc_num; i++) { - // Atomic loads with relaxed ordering (aggregation doesn't need strict ordering) - int32_t pid = atomic_load_explicit(®ion_info.shared_region->procs[i].pid, memory_order_relaxed); - int32_t hostpid = atomic_load_explicit(®ion_info.shared_region->procs[i].hostpid, memory_order_relaxed); - uint64_t proc_usage = atomic_load_explicit( - ®ion_info.shared_region->procs[i].used[dev].total, - memory_order_relaxed); + for (i=0; i < proc_num; i++) { + shrreg_proc_slot_t* slot = ®ion_info.shared_region->procs[i]; + uint64_t proc_usage; + uint64_t seq1, seq2; + int retry_count = 0; + + // Seqlock read protocol: retry until we get a consistent snapshot + // CRITICAL: Memory checks require accurate data, cannot use stale reads + do { + // Read sequence number (must be even = no write in progress) + seq1 = atomic_load_explicit(&slot->seqlock, memory_order_acquire); + + // If odd, writer is in progress, back off with exponential delay + while (seq1 & 1) { + // Exponential backoff to reduce contention + if (retry_count < 5) { + // First 5 retries: just CPU pause (fast path) + #if defined(__x86_64__) || defined(__i386__) + __asm__ __volatile__("pause" ::: "memory"); + #elif defined(__aarch64__) + __asm__ __volatile__("yield" ::: "memory"); + #endif + } else if (retry_count < 20) { + // Next 15 retries: 1μs delay + usleep(1); + } else if (retry_count < 100) { + // Next 80 retries: 10μs delay + usleep(10); + } else { + // After 100 retries: 100μs delay + usleep(100); + // Log if we're spinning for a very long time + if (retry_count % 100 == 0) { + LOG_DEBUG("Seqlock spinning for slot %d, retry %d (writer active)", i, retry_count); + } + } + + retry_count++; + seq1 = atomic_load_explicit(&slot->seqlock, memory_order_acquire); + } + + // Read the data with acquire semantics + proc_usage = atomic_load_explicit(&slot->used[dev].total, memory_order_acquire); + + // Memory barrier to prevent reordering + atomic_thread_fence(memory_order_acquire); + + // Read sequence number again + seq2 = atomic_load_explicit(&slot->seqlock, memory_order_acquire); + + // If sequence numbers match and still even, read was consistent + } while (seq1 != seq2); + + // Consistent read obtained + int32_t pid = atomic_load_explicit(&slot->pid, memory_order_relaxed); + int32_t hostpid = atomic_load_explicit(&slot->hostpid, memory_order_relaxed); LOG_INFO("dev=%d pid=%d host pid=%d i=%lu", dev, pid, hostpid, proc_usage); total+=proc_usage; } + total+=initial_offset; return total; } @@ -382,26 +441,37 @@ uint64_t nvml_get_device_memory_usage(const int dev) { return usage; } -// Lock-free memory add using atomics +// Lock-free memory add using atomics with seqlock for consistent reads int add_gpu_device_memory_usage(int32_t pid, int cudadev, size_t usage, int type) { LOG_INFO("add_gpu_device_memory_lockfree:%d %d->%d %lu", pid, cudadev, cuda_to_nvml_map(cudadev), usage); + int dev = cuda_to_nvml_map(cudadev); ensure_initialized(); // Fast path: use cached slot pointer for our own process if (pid == getpid() && region_info.my_slot != NULL) { - atomic_fetch_add_explicit(®ion_info.my_slot->used[dev].total, usage, memory_order_relaxed); + shrreg_proc_slot_t* slot = region_info.my_slot; + + // Seqlock protocol: increment to odd (write in progress) + atomic_fetch_add_explicit(&slot->seqlock, 1, memory_order_release); + + // Perform updates with release semantics for visibility + atomic_fetch_add_explicit(&slot->used[dev].total, usage, memory_order_release); switch (type) { case 0: - atomic_fetch_add_explicit(®ion_info.my_slot->used[dev].context_size, usage, memory_order_relaxed); + atomic_fetch_add_explicit(&slot->used[dev].context_size, usage, memory_order_release); break; case 1: - atomic_fetch_add_explicit(®ion_info.my_slot->used[dev].module_size, usage, memory_order_relaxed); + atomic_fetch_add_explicit(&slot->used[dev].module_size, usage, memory_order_release); break; case 2: - atomic_fetch_add_explicit(®ion_info.my_slot->used[dev].data_size, usage, memory_order_relaxed); + atomic_fetch_add_explicit(&slot->used[dev].data_size, usage, memory_order_release); break; } + + // Seqlock protocol: increment to even (write complete) + atomic_fetch_add_explicit(&slot->seqlock, 1, memory_order_release); + LOG_INFO("gpu_device_memory_added_lockfree:%d %d %lu", pid, dev, usage); return 0; } @@ -412,30 +482,28 @@ int add_gpu_device_memory_usage(int32_t pid, int cudadev, size_t usage, int type for (i=0; i < proc_num; i++) { int32_t slot_pid = atomic_load_explicit(®ion_info.shared_region->procs[i].pid, memory_order_acquire); if (slot_pid == pid) { - atomic_fetch_add_explicit( - ®ion_info.shared_region->procs[i].used[dev].total, - usage, - memory_order_relaxed); + shrreg_proc_slot_t* slot = ®ion_info.shared_region->procs[i]; + + // Seqlock protocol: increment to odd (write in progress) + atomic_fetch_add_explicit(&slot->seqlock, 1, memory_order_release); + + // Perform updates + atomic_fetch_add_explicit(&slot->used[dev].total, usage, memory_order_release); switch (type) { case 0: - atomic_fetch_add_explicit( - ®ion_info.shared_region->procs[i].used[dev].context_size, - usage, - memory_order_relaxed); + atomic_fetch_add_explicit(&slot->used[dev].context_size, usage, memory_order_release); break; case 1: - atomic_fetch_add_explicit( - ®ion_info.shared_region->procs[i].used[dev].module_size, - usage, - memory_order_relaxed); + atomic_fetch_add_explicit(&slot->used[dev].module_size, usage, memory_order_release); break; case 2: - atomic_fetch_add_explicit( - ®ion_info.shared_region->procs[i].used[dev].data_size, - usage, - memory_order_relaxed); + atomic_fetch_add_explicit(&slot->used[dev].data_size, usage, memory_order_release); break; } + + // Seqlock protocol: increment to even (write complete) + atomic_fetch_add_explicit(&slot->seqlock, 1, memory_order_release); + LOG_INFO("gpu_device_memory_added_lockfree:%d %d %lu", pid, dev, usage); return 0; } @@ -445,7 +513,7 @@ int add_gpu_device_memory_usage(int32_t pid, int cudadev, size_t usage, int type return -1; } -// Lock-free memory remove using atomics +// Lock-free memory remove using atomics with seqlock for consistent reads int rm_gpu_device_memory_usage(int32_t pid, int cudadev, size_t usage, int type) { LOG_INFO("rm_gpu_device_memory_lockfree:%d %d->%d %d:%lu", pid, cudadev, cuda_to_nvml_map(cudadev), type, usage); int dev = cuda_to_nvml_map(cudadev); @@ -453,19 +521,29 @@ int rm_gpu_device_memory_usage(int32_t pid, int cudadev, size_t usage, int type) // Fast path: use cached slot pointer for our own process if (pid == getpid() && region_info.my_slot != NULL) { - atomic_fetch_sub_explicit(®ion_info.my_slot->used[dev].total, usage, memory_order_relaxed); + shrreg_proc_slot_t* slot = region_info.my_slot; + + // Seqlock protocol: increment to odd (write in progress) + atomic_fetch_add_explicit(&slot->seqlock, 1, memory_order_release); + + // Perform updates with release semantics + atomic_fetch_sub_explicit(&slot->used[dev].total, usage, memory_order_release); switch (type) { case 0: - atomic_fetch_sub_explicit(®ion_info.my_slot->used[dev].context_size, usage, memory_order_relaxed); + atomic_fetch_sub_explicit(&slot->used[dev].context_size, usage, memory_order_release); break; case 1: - atomic_fetch_sub_explicit(®ion_info.my_slot->used[dev].module_size, usage, memory_order_relaxed); + atomic_fetch_sub_explicit(&slot->used[dev].module_size, usage, memory_order_release); break; case 2: - atomic_fetch_sub_explicit(®ion_info.my_slot->used[dev].data_size, usage, memory_order_relaxed); + atomic_fetch_sub_explicit(&slot->used[dev].data_size, usage, memory_order_release); break; } - uint64_t new_total = atomic_load_explicit(®ion_info.my_slot->used[dev].total, memory_order_relaxed); + + // Seqlock protocol: increment to even (write complete) + atomic_fetch_add_explicit(&slot->seqlock, 1, memory_order_release); + + uint64_t new_total = atomic_load_explicit(&slot->used[dev].total, memory_order_acquire); LOG_INFO("after delete_lockfree:%lu", new_total); return 0; } @@ -476,33 +554,29 @@ int rm_gpu_device_memory_usage(int32_t pid, int cudadev, size_t usage, int type) for (i = 0; i < proc_num; i++) { int32_t slot_pid = atomic_load_explicit(®ion_info.shared_region->procs[i].pid, memory_order_acquire); if (slot_pid == pid) { - atomic_fetch_sub_explicit( - ®ion_info.shared_region->procs[i].used[dev].total, - usage, - memory_order_relaxed); + shrreg_proc_slot_t* slot = ®ion_info.shared_region->procs[i]; + + // Seqlock protocol: increment to odd (write in progress) + atomic_fetch_add_explicit(&slot->seqlock, 1, memory_order_release); + + // Perform updates + atomic_fetch_sub_explicit(&slot->used[dev].total, usage, memory_order_release); switch (type) { case 0: - atomic_fetch_sub_explicit( - ®ion_info.shared_region->procs[i].used[dev].context_size, - usage, - memory_order_relaxed); + atomic_fetch_sub_explicit(&slot->used[dev].context_size, usage, memory_order_release); break; case 1: - atomic_fetch_sub_explicit( - ®ion_info.shared_region->procs[i].used[dev].module_size, - usage, - memory_order_relaxed); + atomic_fetch_sub_explicit(&slot->used[dev].module_size, usage, memory_order_release); break; case 2: - atomic_fetch_sub_explicit( - ®ion_info.shared_region->procs[i].used[dev].data_size, - usage, - memory_order_relaxed); + atomic_fetch_sub_explicit(&slot->used[dev].data_size, usage, memory_order_release); break; } - uint64_t new_total = atomic_load_explicit( - ®ion_info.shared_region->procs[i].used[dev].total, - memory_order_relaxed); + + // Seqlock protocol: increment to even (write complete) + atomic_fetch_add_explicit(&slot->seqlock, 1, memory_order_release); + + uint64_t new_total = atomic_load_explicit(&slot->used[dev].total, memory_order_acquire); LOG_INFO("after delete_lockfree:%lu", new_total); return 0; } @@ -572,38 +646,60 @@ void exit_handler() { return; } shared_region_t* region = region_info.shared_region; - int slot = 0; - LOG_MSG("Calling exit handler %d",getpid()); - struct timespec sem_ts; - get_timespec(SEM_WAIT_TIME_ON_EXIT, &sem_ts); - int status = sem_timedwait(®ion->sem, &sem_ts); - if (status == 0) { // just give up on lock failure - region->owner_pid = region_info.pid; - while (slot < region->proc_num) { - if (region->procs[slot].pid == region_info.pid) { - memset(region->procs[slot].used,0,sizeof(device_memory_t)*CUDA_DEVICE_MAX_COUNT); - memset(region->procs[slot].device_util,0,sizeof(device_util_t)*CUDA_DEVICE_MAX_COUNT); - region->proc_num--; - region->procs[slot] = region->procs[region->proc_num]; - break; - } - slot++; + if (region == NULL) { + return; + } + + int32_t my_pid = region_info.pid; + LOG_MSG("Cleanup on exit for PID %d", my_pid); + + // ======================================================================== + // CRITICAL CLEANUP (Must succeed, no lock needed) + // ======================================================================== + + // 1. If we're holding owner_pid, clear it atomically + size_t current_owner = atomic_load_explicit(®ion->owner_pid, memory_order_acquire); + if (current_owner == (size_t)my_pid) { + LOG_WARN("Exit while holding owner_pid, releasing atomically"); + // Use CAS to ensure we only clear if it's still us + size_t expected = (size_t)my_pid; + if (atomic_compare_exchange_strong_explicit(®ion->owner_pid, &expected, 0, + memory_order_release, memory_order_acquire)) { + LOG_DEBUG("Released owner_pid and posting semaphore"); + sem_post(®ion->sem); // Unlock the semaphore + } + } + + // 2. Mark our process slot as exited (atomic, no lock needed) + // Set PID to 0 so it's detected as dead by clear_proc_slot_nolock() + for (int slot = 0; slot < SHARED_REGION_MAX_PROCESS_NUM; slot++) { + int32_t slot_pid = atomic_load_explicit(®ion->procs[slot].pid, memory_order_acquire); + if (slot_pid == my_pid) { + LOG_DEBUG("Marking process slot %d as dead (PID %d)", slot, my_pid); + // Atomically set PID to 0 - this marks the slot as available + atomic_store_explicit(®ion->procs[slot].pid, 0, memory_order_release); + // Also set status to 0 (inactive) + atomic_store_explicit(®ion->procs[slot].status, 0, memory_order_release); + break; } - __sync_synchronize(); - region->owner_pid = 0; - sem_post(®ion->sem); - } else { - LOG_WARN("Failed to take lock on exit: errno=%d", errno); } + + // That's it! The slot will be physically removed by clear_proc_slot_nolock() + // when the next process acquires the lock. This is lazy cleanup. + + LOG_MSG("Exit cleanup complete for PID %d", my_pid); } void lock_shrreg() { - struct timespec sem_ts; - get_timespec(SEM_WAIT_TIME, &sem_ts); shared_region_t* region = region_info.shared_region; int trials = 0; while (1) { + // CRITICAL: Create fresh timeout for each iteration! + // If created outside loop, timestamp becomes stale after first timeout + struct timespec sem_ts; + get_timespec(SEM_WAIT_TIME, &sem_ts); + int status = sem_timedwait(®ion->sem, &sem_ts); SEQ_POINT_MARK(SEQ_ACQUIRE_SEMLOCK_OK); @@ -615,29 +711,47 @@ void lock_shrreg() { trials = 0; break; } else if (errno == ETIMEDOUT) { - LOG_WARN("Lock shrreg timeout, try fix (%d:%ld)", region_info.pid,region->owner_pid); - int32_t current_owner = region->owner_pid; - if (current_owner != 0 && (current_owner == region_info.pid || - proc_alive(current_owner) == PROC_STATE_NONALIVE)) { - LOG_WARN("Owner proc dead (%d), try fix", current_owner); - if (0 == fix_lock_shrreg()) { - break; - } - } else { - trials++; - if (trials > SEM_WAIT_RETRY_TIMES) { - LOG_WARN("Fail to lock shrreg in %d seconds", - SEM_WAIT_RETRY_TIMES * SEM_WAIT_TIME); - if (current_owner == 0) { - LOG_WARN("fix current_owner 0>%d",region_info.pid); - region->owner_pid = region_info.pid; - if (0 == fix_lock_shrreg()) { - break; - } + trials++; + size_t current_owner = atomic_load_explicit(®ion->owner_pid, memory_order_acquire); + + if (trials <= 3 || trials % 5 == 0) { // Log first 3, then every 5th + LOG_WARN("Lock shrreg timeout (trial %d/%d), owner=%ld", + trials, SEM_WAIT_RETRY_TIMES, current_owner); + } + + // SIGKILL RECOVERY: Check if owner is dead (the ONLY case where exit cleanup fails) + if (current_owner != 0) { + int owner_status = proc_alive((int32_t)current_owner); + if (owner_status == PROC_STATE_NONALIVE) { + LOG_WARN("Owner %ld is dead (was SIGKILL'd), cleaning up stale lock", current_owner); + // Use CAS so only one process does this + size_t expected = current_owner; + if (atomic_compare_exchange_strong_explicit(®ion->owner_pid, &expected, 0, + memory_order_release, memory_order_acquire)) { + LOG_WARN("Cleared dead owner_pid and posting semaphore"); + sem_post(®ion->sem); // Unlock + usleep(10000); // 10ms for semaphore to propagate + continue; // Retry immediately } + // Another process is handling it, wait a bit + usleep(100000); // 100ms + continue; } continue; // slow wait path } + + // If we're still waiting after many tries, something is seriously wrong + if (trials > 30) { // 30 × 10s = 5 minutes + LOG_ERROR("Cannot acquire lock after 5 minutes, owner=%ld", current_owner); + if (current_owner != 0 && proc_alive((int32_t)current_owner) == PROC_STATE_ALIVE) { + LOG_ERROR("Owner is still ALIVE - this is a deadlock bug!"); + } else { + LOG_ERROR("This should not happen - please report this bug"); + } + LOG_ERROR("Workaround: Delete /tmp/cudevshr.cache and restart all processes"); + exit(-1); + } + continue; // Keep retrying } else { LOG_ERROR("Failed to lock shrreg: %d", errno); } @@ -657,25 +771,90 @@ void unlock_shrreg() { SEQ_POINT_MARK(SEQ_RELEASE_SEMLOCK_OK); } +int lock_postinit() { + shared_region_t* region = region_info.shared_region; + int trials = 0; + while (1) { + // CRITICAL: Create fresh timeout for each iteration! + // If created outside loop, timestamp becomes stale after first timeout + // Use longer timeout for postinit since set_task_pid() can take several seconds + struct timespec sem_ts; + get_timespec(SEM_WAIT_TIME_POSTINIT, &sem_ts); + + int status = sem_timedwait(®ion->sem_postinit, &sem_ts); + if (status == 0) { + // Lock acquired successfully + LOG_DEBUG("Acquired postinit lock after %d waits (PID %d)", trials, getpid()); + return 1; // Success + } else if (errno == ETIMEDOUT) { + trials++; + LOG_MSG("Waiting for postinit lock (trial %d/%d, waited %ds, PID %d)", + trials, SEM_WAIT_RETRY_TIMES_POSTINIT, trials * SEM_WAIT_TIME_POSTINIT, getpid()); + + // After many retries, give up + if (trials > SEM_WAIT_RETRY_TIMES_POSTINIT) { + LOG_ERROR("Postinit lock timeout after %d seconds - another process may have crashed", + SEM_WAIT_RETRY_TIMES_POSTINIT * SEM_WAIT_TIME_POSTINIT); + LOG_ERROR("Skipping host PID detection for this process (will use container PID)"); + return 0; // Timeout - didn't acquire lock + } + continue; + } else { + LOG_ERROR("Failed to lock postinit semaphore: errno=%d", errno); + // Don't give up - keep retrying + trials++; + continue; + } + } +} + +void unlock_postinit() { + shared_region_t* region = region_info.shared_region; + sem_post(®ion->sem_postinit); +} + int clear_proc_slot_nolock(int do_clear) { int slot = 0; int res=0; + int cleaned_pid_zero = 0; + int cleaned_dead = 0; shared_region_t* region = region_info.shared_region; while (slot < region->proc_num) { - int32_t pid = region->procs[slot].pid; - if (pid != 0) { - if (do_clear > 0 && proc_alive(pid) == PROC_STATE_NONALIVE) { - LOG_WARN("Kick dead proc %d", pid); - } else { - slot++; - continue; - } + int32_t pid = atomic_load_explicit(®ion->procs[slot].pid, memory_order_acquire); + + // Skip slots that are already marked as dead (PID=0) by exit cleanup + if (pid == 0) { + LOG_DEBUG("Removing slot %d with PID=0 (marked dead by exit cleanup)", slot); + cleaned_pid_zero++; res=1; region->proc_num--; region->procs[slot] = region->procs[region->proc_num]; __sync_synchronize(); + + // Don't increment slot - check the moved element + continue; } + + // Only check proc_alive() if do_clear is enabled and PID is non-zero + // Limit to 10 checks per call to avoid holding lock too long + if (do_clear > 0 && cleaned_dead < 10 && proc_alive(pid) == PROC_STATE_NONALIVE) { + LOG_WARN("Kick dead proc %d (proc_alive check)", pid); + cleaned_dead++; + res = 1; + region->proc_num--; + region->procs[slot] = region->procs[region->proc_num]; + __sync_synchronize(); + // Don't increment slot - check the moved element + continue; + } + + // Slot is valid, move to next + slot++; + } + if (cleaned_pid_zero > 0 || cleaned_dead > 0) { + LOG_INFO("Cleaned %d PID=0 slots, %d dead proc slots (proc_num now %d)", + cleaned_pid_zero, cleaned_dead, region->proc_num); } return res; } @@ -698,6 +877,7 @@ void init_proc_slot_withlock() { for (i=0; i < proc_num; i++) { int32_t slot_pid = atomic_load_explicit(®ion->procs[i].pid, memory_order_acquire); if (slot_pid == current_pid) { + atomic_store_explicit(®ion->procs[i].seqlock, 0, memory_order_relaxed); // Reset seqlock atomic_store_explicit(®ion->procs[i].status, 1, memory_order_release); // Zero out atomics @@ -718,6 +898,7 @@ void init_proc_slot_withlock() { if (!found) { // Initialize new slot with atomics + atomic_store_explicit(®ion->procs[proc_num].seqlock, 0, memory_order_relaxed); // Start with even (no write) atomic_store_explicit(®ion->procs[proc_num].pid, current_pid, memory_order_release); atomic_store_explicit(®ion->procs[proc_num].hostpid, 0, memory_order_relaxed); atomic_store_explicit(®ion->procs[proc_num].status, 1, memory_order_release); @@ -855,6 +1036,10 @@ void try_create_shrreg() { if (sem_init(®ion->sem, 1, 1) != 0) { LOG_ERROR("Fail to init sem %s: errno=%d", shr_reg_file, errno); } + if (sem_init(®ion->sem_postinit, 1, 1) != 0) { + LOG_ERROR("Fail to init sem_postinit %s: errno=%d", shr_reg_file, errno); + } + atomic_store_explicit(®ion->sm_init_flag, 0, memory_order_relaxed); atomic_store_explicit(®ion->utilization_switch, 1, memory_order_relaxed); atomic_store_explicit(®ion->recent_kernel, 2, memory_order_relaxed); diff --git a/src/multiprocess/multiprocess_memory_limit.h b/src/multiprocess/multiprocess_memory_limit.h index 870e4da0..600375e0 100755 --- a/src/multiprocess/multiprocess_memory_limit.h +++ b/src/multiprocess/multiprocess_memory_limit.h @@ -78,11 +78,12 @@ typedef struct { typedef struct { _Atomic int32_t pid; // Atomic to detect slot allocation _Atomic int32_t hostpid; + _Atomic uint64_t seqlock; // Sequence lock for consistent snapshots device_memory_t used[CUDA_DEVICE_MAX_COUNT]; _Atomic uint64_t monitorused[CUDA_DEVICE_MAX_COUNT]; device_util_t device_util[CUDA_DEVICE_MAX_COUNT]; _Atomic int32_t status; - uint64_t unused[3]; + uint64_t unused[2]; } shrreg_proc_slot_t; typedef char uuid[96]; @@ -94,6 +95,7 @@ typedef struct { _Atomic int32_t sm_init_flag; _Atomic size_t owner_pid; sem_t sem; // Only for process slot add/remove + sem_t sem_postinit; // For serializing postInit() host PID detection uint64_t device_num; uuid uuids[CUDA_DEVICE_MAX_COUNT]; uint64_t limit[CUDA_DEVICE_MAX_COUNT]; @@ -165,6 +167,9 @@ int init_device_info(); void lock_shrreg(); void unlock_shrreg(); +int lock_postinit(); // Returns 1 on success, 0 on timeout +void unlock_postinit(); + //Setspec of the corresponding device int setspec(); //Remove quit process diff --git a/src/multiprocess/multiprocess_utilization_watcher.c b/src/multiprocess/multiprocess_utilization_watcher.c index 9ac6b59a..5e141f4d 100644 --- a/src/multiprocess/multiprocess_utilization_watcher.c +++ b/src/multiprocess/multiprocess_utilization_watcher.c @@ -128,7 +128,6 @@ int get_used_gpu_utilization(int *userutil,int *sysprocnum) { unsigned int nvmlCounts; CHECK_NVML_API(nvmlDeviceGetCount(&nvmlCounts)); - lock_shrreg(); int devi,cudadev; for (devi=0;devi