Skip to content

Commit 6fcbd72

Browse files
committed
fix: Deadlock under load
1 parent e0f143b commit 6fcbd72

File tree

1 file changed

+103
-42
lines changed

1 file changed

+103
-42
lines changed

module.cc

Lines changed: 103 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,33 @@ struct AsyncLocalStorageLookup {
3838

3939
// Structure to hold information for each thread/isolate
4040
struct ThreadInfo {
41-
// Thread name
41+
// Mutex protecting this thread's mutable data (poll_state, last_seen)
42+
// Does NOT protect async_store (immutable after creation)
43+
mutable std::mutex mutex;
44+
// Thread name (immutable after creation)
4245
std::string thread_name;
4346
// Last time this thread was seen in milliseconds since epoch
44-
milliseconds last_seen;
47+
milliseconds last_seen; // Protected by mutex
4548
// Optional async local storage associated with this thread
46-
std::optional<AsyncLocalStorageLookup> async_store;
49+
// Using shared_ptr to safely share with async tasks even if ThreadInfo is
50+
// erased
51+
std::shared_ptr<std::optional<AsyncLocalStorageLookup>> async_store;
4752
// Some JSON serialized state sent via threadPoll
48-
std::string poll_state;
53+
std::string poll_state; // Protected by mutex
54+
55+
// Constructor needed because std::mutex is not movable/copyable
56+
ThreadInfo(std::string name, milliseconds seen,
57+
std::shared_ptr<std::optional<AsyncLocalStorageLookup>> store,
58+
std::string state)
59+
: thread_name(std::move(name)), last_seen(seen),
60+
async_store(std::move(store)), poll_state(std::move(state)) {}
4961
};
5062

51-
static std::mutex threads_mutex;
63+
// Separate mutexes for different concerns:
64+
// - threads_map_mutex: protects the threads map structure
65+
// (insert/erase/iteration)
66+
// - ThreadInfo::mutex: protects each thread's mutable data
67+
static std::mutex threads_map_mutex;
5268
// Map to hold all registered threads and their information
5369
static std::unordered_map<v8::Isolate *, ThreadInfo> threads = {};
5470

@@ -316,7 +332,7 @@ std::string GetThreadState(Isolate *isolate,
316332

317333
struct InterruptArgs {
318334
std::promise<JsStackTrace> promise;
319-
const std::optional<AsyncLocalStorageLookup> *store;
335+
std::shared_ptr<std::optional<AsyncLocalStorageLookup>> store;
320336
};
321337

322338
// Function to be called when an isolate's execution is interrupted
@@ -346,9 +362,9 @@ static void ExecutionInterrupted(Isolate *isolate, void *data) {
346362
}
347363

348364
// Function to capture the stack trace of a single isolate
349-
JsStackTrace
350-
CaptureStackTrace(Isolate *isolate,
351-
const std::optional<AsyncLocalStorageLookup> &store) {
365+
JsStackTrace CaptureStackTrace(
366+
Isolate *isolate,
367+
const std::shared_ptr<std::optional<AsyncLocalStorageLookup>> &store) {
352368
if (isolate->IsExecutionTerminating()) {
353369
return JsStackTrace{{}, ""};
354370
}
@@ -357,8 +373,20 @@ CaptureStackTrace(Isolate *isolate,
357373
auto future = promise.get_future();
358374

359375
// The v8 isolate must be interrupted to capture the stack trace
376+
// Note: Even if we timeout below, the interrupt may still fire later.
377+
// The InterruptArgs holds a shared_ptr to keep data alive until the callback
378+
// executes.
360379
isolate->RequestInterrupt(ExecutionInterrupted,
361-
new InterruptArgs{std::move(promise), &store});
380+
new InterruptArgs{std::move(promise), store});
381+
382+
// Wait with timeout to prevent infinite hang if isolate never processes
383+
// interrupt (e.g., stuck in native code or terminating)
384+
if (future.wait_for(std::chrono::seconds(5)) == std::future_status::timeout) {
385+
// Timeout occurred. The InterruptArgs is intentionally leaked - it will be
386+
// deleted by ExecutionInterrupted if/when the callback eventually fires.
387+
// The shared_ptr keeps the store data alive.
388+
return JsStackTrace{{}, ""};
389+
}
362390

363391
return future.get();
364392
}
@@ -369,34 +397,45 @@ void CaptureStackTraces(const FunctionCallbackInfo<Value> &args) {
369397

370398
std::vector<ThreadResult> results;
371399

400+
std::vector<std::future<ThreadResult>> futures;
372401
{
373-
std::vector<std::future<ThreadResult>> futures;
374-
std::lock_guard<std::mutex> lock(threads_mutex);
402+
// Only need map lock to safely iterate and copy shared_ptrs
403+
// No deadlock risk because we release lock before fut.get()
404+
std::lock_guard<std::mutex> lock(threads_map_mutex);
375405
for (auto &thread : threads) {
376406
auto thread_isolate = thread.first;
377407
auto &thread_info = thread.second;
378408

379409
if (thread_isolate == capture_from_isolate)
380410
continue;
381411

412+
// Copy immutable data and shared_ptrs (no individual thread lock needed)
413+
// thread_name and async_store are immutable after creation
382414
auto thread_name = thread_info.thread_name;
383-
auto poll_state = thread_info.poll_state;
415+
auto async_store_ptr = thread_info.async_store;
416+
417+
// For poll_state, we need to lock the thread's mutex briefly
418+
std::string poll_state;
419+
{
420+
std::lock_guard<std::mutex> thread_lock(thread_info.mutex);
421+
poll_state = thread_info.poll_state;
422+
}
384423

385424
futures.emplace_back(std::async(
386425
std::launch::async,
387-
[thread_isolate, thread_name, poll_state](
388-
const std::optional<AsyncLocalStorageLookup> &async_store)
389-
-> ThreadResult {
390-
return ThreadResult{thread_name,
391-
CaptureStackTrace(thread_isolate, async_store),
392-
poll_state};
393-
},
394-
std::cref(thread_info.async_store)));
426+
[thread_isolate, thread_name, poll_state,
427+
async_store_ptr]() -> ThreadResult {
428+
return ThreadResult{
429+
thread_name, CaptureStackTrace(thread_isolate, async_store_ptr),
430+
poll_state};
431+
}));
395432
}
433+
}
396434

397-
for (auto &fut : futures) {
398-
results.emplace_back(fut.get());
399-
}
435+
// Wait for all futures to complete AFTER releasing the lock
436+
// No deadlock because ThreadPoll uses a different lock (thread's own mutex)
437+
for (auto &fut : futures) {
438+
results.emplace_back(fut.get());
400439
}
401440

402441
auto current_context = capture_from_isolate->GetCurrentContext();
@@ -502,19 +541,25 @@ void CaptureStackTraces(const FunctionCallbackInfo<Value> &args) {
502541
// destroyed
503542
void Cleanup(void *arg) {
504543
auto isolate = static_cast<Isolate *>(arg);
505-
std::lock_guard<std::mutex> lock(threads_mutex);
544+
std::lock_guard<std::mutex> lock(threads_map_mutex);
506545
threads.erase(isolate);
507546
}
508547

509548
void RegisterThreadInternal(
510549
Isolate *isolate, const std::string &thread_name,
511550
std::optional<AsyncLocalStorageLookup> async_store) {
512551

513-
std::lock_guard<std::mutex> lock(threads_mutex);
514-
auto found = threads.find(isolate);
515-
if (found == threads.end()) {
516-
threads.emplace(isolate, ThreadInfo{thread_name, milliseconds::zero(),
517-
std::move(async_store), ""});
552+
std::lock_guard<std::mutex> lock(threads_map_mutex);
553+
// try_emplace constructs the ThreadInfo in-place if the key doesn't exist
554+
// The mutex will be default-constructed automatically by ThreadInfo
555+
// constructor
556+
auto [iter, inserted] = threads.try_emplace(
557+
isolate, thread_name, milliseconds::zero(),
558+
std::make_shared<std::optional<AsyncLocalStorageLookup>>(
559+
std::move(async_store)),
560+
"");
561+
562+
if (inserted) {
518563
// Register a cleanup hook to remove this thread when the isolate is
519564
// destroyed
520565
node::AddEnvironmentCleanupHook(isolate, Cleanup, isolate);
@@ -646,19 +691,28 @@ void ThreadPoll(const FunctionCallbackInfo<Value> &args) {
646691
poll_state = JSONStringify(isolate, obj);
647692
}
648693

694+
// First, find the thread without holding any lock (map reads are safe)
695+
// Then lock only that specific thread's mutex (not the global map mutex)
696+
ThreadInfo *thread_info_ptr = nullptr;
649697
{
650-
std::lock_guard<std::mutex> lock(threads_mutex);
698+
std::lock_guard<std::mutex> map_lock(threads_map_mutex);
651699
auto found = threads.find(isolate);
652700
if (found != threads.end()) {
653-
auto &thread_info = found->second;
654-
thread_info.poll_state = std::move(poll_state);
701+
thread_info_ptr = &found->second;
702+
}
703+
}
655704

656-
if (enable_last_seen) {
657-
thread_info.last_seen = duration_cast<milliseconds>(
658-
GetUnbiasedMonotonicTime().time_since_epoch());
659-
} else {
660-
thread_info.last_seen = milliseconds::zero();
661-
}
705+
// Update thread-specific data with only the thread's mutex held
706+
// This prevents deadlock with CaptureStackTraces which holds map mutex
707+
if (thread_info_ptr != nullptr) {
708+
std::lock_guard<std::mutex> thread_lock(thread_info_ptr->mutex);
709+
thread_info_ptr->poll_state = std::move(poll_state);
710+
711+
if (enable_last_seen) {
712+
thread_info_ptr->last_seen = duration_cast<milliseconds>(
713+
GetUnbiasedMonotonicTime().time_since_epoch());
714+
} else {
715+
thread_info_ptr->last_seen = milliseconds::zero();
662716
}
663717
}
664718
}
@@ -671,12 +725,19 @@ void GetThreadsLastSeen(const FunctionCallbackInfo<Value> &args) {
671725
milliseconds now = duration_cast<milliseconds>(
672726
GetUnbiasedMonotonicTime().time_since_epoch());
673727
{
674-
std::lock_guard<std::mutex> lock(threads_mutex);
728+
std::lock_guard<std::mutex> map_lock(threads_map_mutex);
675729
for (const auto &[thread_isolate, info] : threads) {
676-
if (info.last_seen == milliseconds::zero())
730+
// Lock each thread's mutex briefly to read last_seen
731+
milliseconds last_seen;
732+
{
733+
std::lock_guard<std::mutex> thread_lock(info.mutex);
734+
last_seen = info.last_seen;
735+
}
736+
737+
if (last_seen == milliseconds::zero())
677738
continue; // Skip threads that have not registered more than once
678739

679-
int64_t ms_since = (now - info.last_seen).count();
740+
int64_t ms_since = (now - last_seen).count();
680741
result
681742
->Set(isolate->GetCurrentContext(),
682743
String::NewFromUtf8(isolate, info.thread_name.c_str(),

0 commit comments

Comments
 (0)