Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
#pragma once

#include <atomic>
#include <condition_variable>
#include <mutex>

#include "constants.hpp"

Expand All @@ -26,6 +28,11 @@ class Sampler
// stopped or started in a straightforward manner without finer-grained control (locks)
std::atomic<uint64_t> thread_seq_num{ 0 };

// Thread exit synchronization - allows stop() to wait for the sampling thread to exit
std::atomic<bool> thread_running{ false };
std::mutex thread_exit_mutex;
std::condition_variable thread_exit_cv;

// This is a singleton, so no public constructor
Sampler();

Expand Down
31 changes: 29 additions & 2 deletions ddtrace/internal/datadog/profiling/stack/src/sampler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ Sampler::adapt_sampling_interval()
void
Sampler::sampling_thread(const uint64_t seq_num)
{
// Mark thread as running
thread_running.store(true);

// Re-install SIGSEGV/SIGBUS handlers here, after Python initialization.
// The handlers may have been installed during static init, but Python or
// libraries (faulthandler, Django, FastAPI) can overwrite them afterwards.
Expand Down Expand Up @@ -206,6 +209,13 @@ Sampler::sampling_thread(const uint64_t seq_num)
// systems.
std::this_thread::sleep_until(sample_time_now + microseconds(sample_interval_us.load()));
}

// Signal that the thread is exiting
{
std::lock_guard<std::mutex> lock(thread_exit_mutex);
thread_running.store(false);
}
thread_exit_cv.notify_all();
}

void
Expand Down Expand Up @@ -234,6 +244,12 @@ Sampler::postfork_child()
// Clear stale task/greenlet entries from parent process.
// No lock needed: only one thread exists in child immediately after fork.

// The sampling thread from the parent doesn't exist in the child.
// Reset the flag so stop() doesn't wait for a non-existent thread.
thread_running.store(false);
new (&thread_exit_mutex) std::mutex();
new (&thread_exit_cv) std::condition_variable();

// Clear stale echion state (mutexes, maps) from parent process
if (echion) {
echion->postfork_child();
Expand Down Expand Up @@ -351,9 +367,12 @@ Sampler::start()
// We might as well get the default stack size and use that
rlimit stack_sz = {};
getrlimit(RLIMIT_STACK, &stack_sz);
if (create_thread_with_stack(stack_sz.rlim_cur, this, ++thread_seq_num) == 0) {
auto thread_id = create_thread_with_stack(stack_sz.rlim_cur, this, ++thread_seq_num);
if (thread_id == 0) {
return false;
}

pthread_detach(thread_id);
#else
try {
std::thread t(&Sampler::sampling_thread, this, ++thread_seq_num);
Expand All @@ -369,8 +388,16 @@ void
Sampler::stop()
{
// Modifying the thread sequence number will cause the sampling thread to exit when it completes
// a sampling loop. Currently there is no mechanism to force stuck threads, should they get locked.
// a sampling loop.
++thread_seq_num;

// Wait for the sampling thread to actually exit (with timeout to avoid hanging forever)
std::unique_lock<std::mutex> lock(thread_exit_mutex);
Comment thread
r1viollet marked this conversation as resolved.
constexpr auto timeout = std::chrono::seconds(3);
bool exited = thread_exit_cv.wait_for(lock, timeout, [this]() { return !thread_running.load(); });
if (!exited) {
std::cerr << "Failed to stop sampling thread after timeout, exiting forcefully." << std::endl;
}
}

void
Expand Down
Loading