본문 바로가기

C++

Using shared_ptr in multi-thread process

#include <iostream>
#include <memory>
#include <thread>
#include <atomic>
#include <vector>
#include <algorithm>
#include <chrono>
#include <sstream>

void WriteSomething(std::shared_ptr<std::atomic<int>> const _p_int) {
	std::stringstream ss;
	for (size_t i = 0; i < 4; ++i) {
		_p_int->fetch_add(1, std::memory_order_acquire);
		ss << std::this_thread::get_id() << " claims " << *_p_int << " use count: " << _p_int.use_count() << std::endl;
		std::cout << ss.str();
		ss.clear();
		std::this_thread::sleep_for(std::chrono::milliseconds(50));
	}
	std::cout << "Thread end!\n";
}

void ReadSomething(std::shared_ptr<std::atomic<int>> const _p_int) {
	std::stringstream ss;
	while (*_p_int < 16) {
		ss << "*p_cnt is " << *_p_int << std::endl;
		std::cout << ss.str();
		ss.clear();
		std::this_thread::sleep_for(std::chrono::milliseconds(20));
	}
}

int main()
{
	const size_t writers_num = 4;
	std::vector<std::thread> workers;
	auto p_cnter = std::make_shared<std::atomic<int>>(0);
	for (size_t i = 0; i < writers_num; ++i) {
		workers.push_back(std::thread(WriteSomething, p_cnter));
	}
	workers.push_back(std::thread(ReadSomething, p_cnter));
	std::for_each(workers.begin(), workers.end(), [](std::thread& _thread)->void {_thread.join(); });
	std::cout << "After threads end, use count: " << p_cnter.use_count() << " and p_cnter is "<<*p_cnter << std::endl;

	return 0;
}

Here is 4 threads to increment the data and 1 thread to read the data

use_count is increased during the threads are activated and it is decremented when the threads are finished.

 

On the other hands, what's happen when we use atomic<shared_ptr> instead of shared_ptr<atomic>?

#include <iostream>
#include <memory>
#include <thread>
#include <atomic>
#include <vector>
#include <algorithm>
#include <chrono>
#include <sstream>



void WriteSomething(std::atomic<std::shared_ptr<int>>& _p_int) {
	for (size_t i = 0; i < 40000; ++i) {
		*_p_int.load() += 1;
	}
}


int main()
{
	const size_t writers_num = 4;
	std::vector<std::thread> workers;
	std::atomic<std::shared_ptr<int>>* A;
	{
		auto p_cnter = std::atomic<std::shared_ptr<int>>(std::make_shared<int>(0));
		A = &p_cnter;
		std::cout << "Before threads starts, use count: " << p_cnter.load().use_count() << " and p_cnter is " << *p_cnter.load() << std::endl;
		for (size_t i = 0; i < writers_num; ++i) {
			workers.push_back(std::thread(WriteSomething, std::ref(p_cnter)));
		}
		std::for_each(workers.begin(), workers.end(), [](std::thread& _thread)->void {_thread.join(); });
		std::this_thread::sleep_for(std::chrono::milliseconds(50));
		std::cout << "After threads end, use count: " << p_cnter.load().use_count() << " and p_cnter is " << *p_cnter.load() << std::endl;
		std::cout << "In the scope, is shared_ptr alive? value: " << *A->load() << " use_count: " << A->load().use_count() << std::endl;
	}
	std::cout << "Out of the scope, is shared_ptr alive? value: " << *A->load() << " use_count: " << A->load().use_count() << std::endl;

	return 0;
}

 

We expected 160,000 because 4 threads increments 40,000 times but it didn't.

It is caused *_p_int.load() += 1; is not a atomic instruction.

It loads of the content of atomic<shared_ptr>, and copies its value, and adds one on it, and store it. When other thread access on it, it may cause deviant from our expectation.

 

 

 

Well, can we update(redefine) and read the shared_ptr simultaneously?

 

#include <iostream>
#include <memory>
#include <thread>

std::shared_ptr<int> g;

void read_g()
{
    long long sum = 0;
    for (int i = 0; i < 1000 * 1000; ++i)
    {
        auto x = g; // read
        if (x)
        {
            sum += *x;
        }
    }
    printf("sum = %lld\n", sum);
}

void write_g()
{
    for (int i = 0; i < 1000 * 1000; ++i)
    {
        auto n = std::make_shared<int>(42);
        g = n; // update
    }
}

int main()
{
    g = std::make_shared<int>(42);
    std::thread t1(read_g);
    std::thread t2(write_g);
    t1.join();
    t2.join();

    return 0;
}

Code of the above can be compiled but makes runtime error.(Access violation)

 

 

 

In this case, we can use atomic STL and .store() method (in C++ 20) to avoid access violation.

It changes the value of the data atomically even it is shared_ptr.

 

#include <iostream>
#include <memory>
#include <thread>
#include <atomic>
#include <vector>
#include <algorithm>
#include <chrono>
#include <sstream>



void WriteSomething(std::atomic<std::shared_ptr<int>>& _p_int) {
	std::stringstream ss;
	for (size_t i = 0; i < 4; ++i) {
		//_p_int->fetch_add(1, std::memory_order_acquire);
		_p_int.store(std::atomic<std::shared_ptr<int>>(std::make_shared<int>(
			*_p_int.load() + 1
			)));/*Load the data and update it.*/
		ss << std::this_thread::get_id() << " claims " << *_p_int.load() << " use count: " << _p_int.load().use_count() << std::endl;
		std::cout << ss.str();
		ss.clear();
		std::this_thread::sleep_for(std::chrono::milliseconds(50));
	}
	std::cout << "Thread end!\n";
}

void ReadSomething(std::atomic<std::shared_ptr<int>>& _p_int) {
	std::stringstream ss;
	while (*_p_int.load() < 5) {
		ss << "*p_cnt is " << *_p_int.load() << std::endl;
		std::cout << ss.str();
		ss.clear();
		std::this_thread::sleep_for(std::chrono::milliseconds(20));
	}
}

int main()
{
	const size_t writers_num = 4;
	std::vector<std::thread> workers;
	std::atomic<std::shared_ptr<int>>* A;
	{
		auto p_cnter = std::atomic<std::shared_ptr<int>>(std::make_shared<int>(0));
		A = &p_cnter;
		std::cout << "Before threads starts, use count: " << p_cnter.load().use_count() << " and p_cnter is " << *p_cnter.load() << std::endl;
		for (size_t i = 0; i < writers_num; ++i) {
			workers.push_back(std::thread(WriteSomething, std::ref(p_cnter)));
		}
		workers.push_back(std::thread(ReadSomething, std::ref(p_cnter)));
		std::for_each(workers.begin(), workers.end(), [](std::thread& _thread)->void {_thread.join(); });
		std::this_thread::sleep_for(std::chrono::milliseconds(50));
		std::cout << "After threads end, use count: " << p_cnter.load().use_count() << " and p_cnter is " << *p_cnter.load() << std::endl;
		std::cout << "In the scope, is shared_ptr alive? value: " << *A->load() << " use_count: " << A->load().use_count() << std::endl;
	}
	std::cout << "Out of the scope, is shared_ptr alive? value: " << *A->load() << " use_count: " << A->load().use_count() << std::endl;

	return 0;
}

Using .store() method of std::atomic, we can update the shared_ptr without any access violation.

However, but .load and .store cannot be unifed like .fetch_add() method so it may make sharing problem.

We expected its result is 16 because 4 threads increments 4 time but it doesn't work as expectation.

Futhermore, redefinition of shared_ptr needs huge cost. So, serious consideration is needed for reallocation of shared_ptr.

'C++' 카테고리의 다른 글

Using smart pointers and ownership  (0) 2022.05.15