c++的线程初探-2
  0eGysyk4Lrwg 2023年11月02日 123 0

(目录)


一、条件变量

条件变量是一种线程同步机制。当条件不满足时,相关线程被一直阻塞,直到某种条件出现,这些线程才会被唤醒。 C++11的条件变量提供了两个类:

  • condition_variable:只支持与普通mutex搭配,效率更高。
  • condition_variable_any:是一种通用的条件变量,可以与任意mutex搭配(包括用户自定义的锁类型)。

生产消费问题的经典案例:

#include <iostream>
#include <string>
#include <cstdlib>
#include <ctime>
#include <thread> //线程类头文件。
#include <mutex>  //互斤锁类的头文件
#include <deque>  //deque容器的头文件。
#include <queue>  //queue容器的头文件。
#include <condition_variable> //条件变量的头文件。

using namespace std;

class AA {
	mutex m_mutex;
	condition_variable m_cond;
	queue<string, deque<string>> m_q;
public:
	void incache(int num) { // 生产数据,num指定数据的个数
		lock_guard < mutex > lock(m_mutex); // 申请加锁。离开作用域的时候lock_guard会自动释放锁。
		for (int i = 0; i < num; ++i) {
			static int bh = 1; // 编号
			string message = to_string(bh++) + " data";
			m_q.push(message);
			cout << "producting thread: " << this_thread::get_id() << ", "
					<< message << endl;
		}
		// 唤醒一个被当前条件变量阻塞的消费线程
		//m_cond.notify_one();

		// 唤醒全部被当前条件变量阻塞的消费线程:让所有消费者线程都进来抢
		m_cond.notify_all();
	}

	void outchache() { // 消费数据
		while (true) {
			string message;
			{
				//cout << "consuming thread: " << this_thread::get_id() << ", "	<< "申请加锁" << endl;
				unique_lock < mutex > lock(m_mutex); // lock互斥锁在出作用域的时候也会释放
				//cout << "consuming thread: " << this_thread::get_id() << ", "	<< "加锁成功" << endl;

				// 挡住线程
				//this_thread::sleep_for(chrono::hours(1));
				// 存在条件变量虚假唤醒:消费者线程被唤醒后,缓存队列中没有数据。
				// 被虚假唤醒的消费者线程要通过while反复判断,然后再进入wait()。使用if语句达不到效果(错过时点)。
				while (m_q.empty())
					// 1. 把互斥锁m_mutex解锁 2. 阻塞,等待被唤醒。 3. 给互斥锁m_mutex加锁。
					m_cond.wait(lock);
				/**
				 * c++11为我们提供了与上述while循环等效的解决方案:
				 * m_cond.wait(lock, [this]{return !m_q.empty();});
				 */

				message = m_q.front();
				m_q.pop();
				cout << "consuming thread: " << this_thread::get_id() << ", "
						<< message << endl;
				//lock.unlock(); 手工解锁
			}

			// 模拟消费出队数据
			this_thread::sleep_for(chrono::milliseconds(1));
		}
	}
};

int main(int argc, const char **argv) {
	const int max = 8;
	srand(time(nullptr));

	AA aa;

	// 定义消费线程
	thread t1(&AA::outchache, &aa);
	thread t2(&AA::outchache, &aa);
	thread t3(&AA::outchache, &aa);

	this_thread::sleep_for(chrono::seconds(rand() % max));
	// 主线程生产数据
	aa.incache(6);

	// 主线程生产数据
	this_thread::sleep_for(chrono::seconds(rand() % max));
	aa.incache(8);

	// 回收线程资源
	t1.join();
	t2.join();
	t3.join();

	return 0;
}

普通的互斤锁为什么要转换成unique_lock之后才能用于条件变量呢?

unique_lock < mutex > lock(m_mutex);

template <class Mutex> class unique_lock 是模板类,模板参数为互斥锁类型。 unique_lock和lock_guard都是管理锁的辅助类,都是RAII风格(在构造时获得锁,在析构时释放锁)。它们的区别在于:为了配合condition_variable,unique_lock还有lock()和unlock()成员函数——而lock_guard没有。

二、wait()所做的工作

  1. 把互斥锁解锁
  2. 阻塞,等待被唤醒。
  3. 给互斥锁加锁。 如果wait()返回了,一定申请到了互斥锁。

三、原子类型atomic

c++11提供了atomic<T>模板类(结构体),用于支持原子类型,模板参数可以是bool、char、int、long、long long、指针类型(不支持浮点类型和自定义数据类型)——只支持整数。原子操作由CPU指令提供支持,它的性能比锁和消息传递更高,并且,不需要程序员处理加锁和释放锁的问题,支持修改、读取、交换、比较并交换等操作。 头文件:#include <atomic>

#include <iostream>
#include <thread> //线程类头文件。
#include <atomic>

using namespace std;

// 只支持整数
atomic<int> ga { 0 };
//atomic<int> ga(0);
//atomic_int ga(0);

void func() {
	for (int i = 0; i < 1000000; ++i)
		ga++;
}

int main(int argc, const char **argv) {
	thread t1(func);
	thread t2(func);

	t1.join();
	t2.join();

	cout << "ga = " << ga << endl;

	return 0;
}

另一个演示

#include <iostream>
#include <thread> //线程类头文件。
#include <atomic>

using namespace std;

int main(int argc, const char **argv) {
	atomic<int> a(3); // 转换函数
	cout << "a = " << a.load() << endl; // 读取原子变量a的值
	a.store(8); // 把8存储到原子变量中
	cout << "a = " << a.load() << endl; // 读取原子变量a的值

	int old;
	//用于存放原值
	old = a.fetch_add(5); //把原子变量a的值与5相加,返回原值
	cout << "old = " << old << ", a =" << a.load() << endl; // old=8, a=13
	old = a.fetch_sub(2); //把原子变量a的值减2,返回原值。
	cout << "old = " << old << ", a =" << a.load() << endl; // old=13, a=11

	atomic<int> ii(3); //原子变量
	int expect = 3; //期待值
	int val = 5; //打算存入原子变量的值
	//比较原子变量的值和预期值expect
	//如果当两个值相等,把val存储到原子变量中
	//小如果当两个值不相等,用原子变量的值更新预期值
	//执行存储操作时返回true,否则返回false。
	bool bret = ii.compare_exchange_strong(expect, val);
	cout << "bret=" << bret << endl;
	cout << "ii=" << ii << endl;
	cout << "expect=" << expect << endl;

	return 0;
}

比较原子变量的值和预期值expect,如果当两个值相等,把val存储到原子变量中,函数返回true;如果当两个值不相等,用原子变量的值更新预期值,函数返回false。CAS指令。

T compare_exchange_strong(T &expect,const T val)noexcept;
  • atomic<T> 模板类重载了整数操作的各种运算符。
  • atomic<T> 模板类的模板参数支持指针,但不表示它所指向的对象是原子类型。
  • 原子整型可以用作计数器,布尔型可以用作开关。
  • CAS指令是实现无锁队列的基础。

四、线程函数的多种形式

#include <iostream>
#include <thread> //线程类头文件。

using namespace std;

void f() {
	cout << "void f() .." << endl;
}
struct F {
	F(int a) :
			x(a) {
	}
	void operator()() {
		cout << "F::operator() " << x << endl;
	}
	void f() {
		cout << "member f " << x << endl;
	}
	int x;
};

int main(int argc, const char **argv) {
	thread t1(f);
	thread t2(F(20));
	thread t3([] {
		cout << "noname func" << endl;
	});
	F a(10);
	thread t4(F::f, &a); // 相当于有参的函数

	t1.join();
	t2.join();
	t3.join();
	t4.join();

	return 0;
}

五、引用的传递

引用的传递有点不同,如果不加处理,编译器无法判断我们是想传引用,还是想传值。 可以这样处理:

#include <iostream>
#include <thread> //线程类头文件。

using namespace std;

void f(int &x) {
	this_thread::sleep_for(chrono::seconds(1));
	cout << x << endl;
}

int main(int argc, const char **argv) {
	int x = 5;
	thread td(f, std::ref(x));
	x = 20;
	td.join();

	return 0;
}

六、再议线程互斥

image.png

用锁原则总结:

  • 共享对象的操作要考虑加锁
  • 锁的颗粒度越小越好
  • 尽量不要用 mutex 裸装上阵,危险处处!
  • 如无特殊需要,用 lock_guard 即可
  • 如控制上满足不了要求(一般是粒度),改用unique_lock

七、局限于线程中的“全局变量” thread_local

#include <iostream>
#include <thread>

using namespace std;

thread_local int x = 0;

void g() {
	x++;
	cout << x << endl;
}

void f() {
	for (int i = 0; i < 10; i++)
		g();
}

int main(int argc, const char **argv) {
	thread t1(f);
	thread t2(f);
	t1.join();
	t2.join();
	cout << x << endl;

	return 0;
}

每个thread_local变量在线程创建的时候创建,然后对本线程而言,相当于全局变量;当线程销毁的时候,会同时销毁当初创建的这个thread_local变量。thread_local变量不是静态存储的。

七、c++11 future & promise

#include <iostream>
#include <thread>
#include <future>
#include <chrono>

using namespace std;

int func(int x, int y) {
	this_thread::sleep_for(chrono::seconds(5));
	return x * 100 + y;
}

int main(int argc, const char **argv) {
	// launch::async 表示创建新线程;
	// launch::deferred 表示延迟处理,同步调用,不创建新线程;
	// launch::async | launch::deferred 表示系统自己看着办
	future<int> res = async(launch::async, func, 3, 6); // 如果线程中抛出了异常,可以在主线程中抓住,
	cout << "do something else ..." << endl;
	int r = res.get(); // func计算未完成就阻塞在这里,直到func返回结果。
	cout << r << endl;

	return 0;
}
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

0eGysyk4Lrwg