Download:C/C++气象数据中心实战,手把手教你做工业级项目
c++线程间共享数据的问题
所有线程间共享数据的问题,都是修改数据导致的(竞争条件)。如果所有的共享数据都是只读的,就没问题,因为一个线程所读取的数据不受另一个线程是否正在读取相同的数据而影响。
避免有问题的竞争条件 1.用保护机制封装你的数据结构,以确保只有实际执行修改的线程能够在不变量损坏的地方看到中间数据。 2.修改数据结构的设计及其不变量,从而令修改作为一系列不可分割的变更来完成,每个修改均保留其不变量。者通常被称为无锁编程,且难以尽善尽美。
用互斥元保护数据 在清单3.1 用互斥元保护列表中,有一个全局变量,它被相应的std::mutex的全局实例保护。在add_to_list()以及list_contains()中对std::lock_guardstd::mutex的使用意味着这些函数中的访问是互斥的list_contains()将无法再add_to_list()进行修改的半途看到该表。
注意:一个迷路的指针或引用,所有的保护都将白费。在清单3.2 意外地传出对受保护数据的引用展示了这一个错误的做法。
发现接口中固有的竞争条件,这是一个粒度锁定的问题,就是说锁定从语句上升到接口了,书中用一个stack类做了一个扩展,详见清单3.5 一个线程安全栈的详细类定义
死锁:问题和解决方案:为了避免死锁,常见的建议是始终使用相同的顺序锁定者两个互斥元。 std::lock函数可以同时锁定两个或更多的互斥元,而没有死锁的风险。 常见的思路:
避免嵌套锁
在持有锁时,避免调用用户提供的代码
以固定顺序获取锁 这里有几个简单的事例:清单3.7 使用锁层次来避免死锁、清单3.9 用std::unique_lock灵活锁定
锁定在恰当的粒度 特别的,在持有锁时,不要做任何耗时的活动,比如文件的I/O。 一般情况下,只应该以执行要求的操作所需的最小可能时间而去持有锁。这也意味着耗时的操作,比如获取获取另一个锁(即便你知道它不会死锁)或是等待I/O完成,都不应该在持有锁的时候去做,除非绝对必要。 在清单3.10 在比较运算符中每次锁定一个互斥元虽然减少了持有锁的时间,但是也暴露在竞争条件中去了。
用于保护共享数据的替代工具 二次检测锁定模式,注意这个和单例模式中的饱汉模式不一样,它后面有对数据的使用
void undefined_behaviour_with_double_checked_locking()
{
if(!resource_ptr)
{
std::lock_guard<std::mutex> lk(resource_mutex);
if(!resource_ptr)
{
resoutce_ptr.reset(new some_resource);
}
}
resource_ptr->do_something();
}
它有可能产生恶劣的竞争条件,因为在锁外部的读取与锁内部由另一线程完成的写入不同步。这就因此创建了一个竞争条件,不仅涵盖了指针本身,还涵盖了指向的对象。
C++标准库提供了std::once_flag和std::call_once来处理这种情况。使用std::call_once比显示使用互斥元通常会由更低的开销,特别是初始化已经完成的时候,应优先使用。清单3.12 使用std::call_once的线程安全的类成员延迟初始化
保护很少更新的数据结构:例如DNS缓存,使用读写互斥元:单个“写”线程独占访问或共享,由多个“读”线程并发访问。 清单3.13 使用boost::share_mutex保护数据结构
main 函数的返回值为 int 类型,注意不要写成 void 类型。
C/C++气象数据中心实战
C/C++手把手教你做工业级项目
//std::accumulate的并行版本(来自清单2.8)
template <typename Iterator,typename T>
struct accumulate_block
{
void operator()(Iterator first,Iterator last,T& result)
{
result=std::accumulate(first,last,result);
}
};
template <typename Iterator,typename T>
T parallel_accumulate(Iterator first,Iterator last,T init)
{
unsigned long const length=std::distance(first,last);
if(!length) //如果输入的范围为空,只返回初始值init
return init;
unsigned long const min_per_thread=25; //最小块的大小
unsigned long const max_threads=(length+min_per_thread-1)/min_per_thread; //处理的元素数量除以最小块的大小,获取线程的最大数量
unsigned long const hardware_threads=std::thread::hardware_concurrency(); //真正并发运行的线程数量的指示
//要运行的线程数是你计算出的最大值的硬件线程数量的较小值。
unsigned long const num_threads=std::min(hardware_threads!=0?hardware_threads:2,max_threads);
//如果hardware_concurrency返回0,我们就替换成2,运行过多的线程,会在单核机器上变慢,过少会错过可用的并发
unsigned long const block_size=length/num_threads; //待处理的线程的条目数量是范围的长度除以线程的数量
std::vector<T> results(num_threads); //保存中间结果
std::vector<std::thread> threads(num_threads-1); //因为有一个线程(本线程)了所以少创建一个文档
//循环:1.递进block_end到当前块的结尾,2.并启动一个新的线程来累计此块的结果。3.下一个块的开始是这一个的结束
Iterator block_start=first;
for(unsigned long i = 0; i < (num_threads-1);++i)
{
Iterator block_end=block_start;
std::advance(block_end,block_size); ...1
threads[i]=std::thread(accumulate_block<Iterator,T>(),block_start,block_end,std::ref(results[i])); ...2
block_start=block_end; ...3
}
//这里是处理上面没有整除的掉block_size的剩下的部分
accumulate_block()(block_start,last,results[num_threads-1]);
//通过join等待所有计算的线程
std::for_each(threads.begin(),threads.end(),std::mem_fn(&std::thread::join));
//一旦累计计算出最后一个块的结果,调用accumulate将结果计算出来
return std::accumulate(results.begin(),results.end(),init);
}