mysql/stonedb-遍历元组数据时进行多线程拆解
  TEZNKK3IfmPf 2023年11月12日 17 0

逐行访问数据库中的元素涉及对磁盘IO的操作, 读取pack后又涉及解压和数据转换的操作,单线程处理时会产生大量的耗时。

一个简单的做法便是将数据拆分成不同的子集,然后利用多核CPU去处理不同的子集,最后将结果汇总。

本文分析这样的做法的一般性。

拆分成多线程处理面临的问题:

一. 需要有一个线程池任务处理模块

如果没有的话,那就需要写一个这样的多线程任务处理的模块,需要包含以下内容:

  1. 包含对要处理的任务的基本的数据结构的包装
  1. 一个任务必须可以独立的运行
  2. 任务与线程之间的关系,由线程池去消费任务
  1. 可以线程池中添加任务
  1. 任务与线程的模型为生产者与消费者
  2. 是否可以在任意时刻向线程池添加任务,一方面是需求驱动,不过更多的情况是受限于系统内原有的线程池模型
  1. 启动线程可以等待线程池处理完毕
  1. 类似于协程中的yield概念
  2. 目的在于不破坏原有的上下文的逻辑,同步的逻辑,在经过多线程的工作池处理后,返回调用处依然是同步的语义

二. 明确出临界区的范围

要明确出临界区的范围,原因就在于多线程的内存可见性,以及由此引发的数据安全的问题。

将原有的单个集合,拆分成不同的独立的子集,这就相当于打破了原有的数据间的交互逻辑。从上下文的场景看,可以分为以下几种类型的数据:

2.1 要遍历访问的元组数据

元组的数据是相对容易拆分的,因为访问的时候元组和元组间其实并没有交互,仅是按照顺序逐个访问。只要将整个表的元组,切分好起始位置, 划分成子集,就可以独立的访问每个子集元组的数据。

2.2 在遍历访问元组时的一些状态控制的数据

状态数据的目的一方面是为了统计,一方面是在遍历访问过程中对后序遍历做控制。

这些数据需要在具体场景中具体分析, 如果只是控制是否访问本pack,那么可以处理成任务内数据处理,而非全局的。

2.3 遍历访问元组获取的符合条件的结果

对元组访问结果的集合必然是一个全局的,但是对于结果集的处理有不同的做法:

  1. 在每个任务内先获取本任务的结果集,最后将所有任务的结果集合并
  2. 每个任务使用全局的结果集,但是在修改全局结果集时对数据加锁
  3. 使用线程安全的数据结构,每个任务向其中填充数据。(其实内部也是要加锁,区别在于加锁的范围)

第一个做法最为简单, 也没有锁的性能问题,但是,不足在于需要2倍的内存占用,以及内存拷贝的开销。不过在业务的测试中,内存拷贝相比其他的场景还可以接受(未对结果产生数量级影响)。

第二和第三个做法需要慎重,需要对底层的数据结构有深刻的理解,并且需要有实际的测试数据。

三. 理解底层模块间的数据关系, 避免在多线程处理时破坏底层的访问

听起来像是一句废话,如果不理解底层模块的数据的关系,那怎么能保证用多线程并行的去访问时不会产生破坏呢?

问题就在于要做到这一点并不容易, 模块间耦合严重的最大的恶果,就在于维护时无法独立的对某个模块进行维护,而必须将所有模块间的交互关系都理清。

要修改代码时,也无法单独的对特定的模块进行修改,必须将所涉及到的逻辑,经过重新设计后,一并修改。

以上还是基于工程化的设计思想上的,如果是基于数学模型实现的代码,那么可以说整个代码库是数学模型的一个具体实现,导致:

  1. 难以从代码推导出逻辑,必须首先理解其数学模型
  2. 修改时, 也必须从数学模型入手, 重新设计数学模型后,将其用代码实现

对顺序遍历元组进行多线程拆分的一般做法:

一. 必须读懂原有代码的逻辑

也是一句废话, 问题就在于考虑到代码的耦合程度,需要花费相当大的精力。必要时还必须理解AP系统的一些概念,和数据库系统的常规的数学模型。

2022-09-01 mysql/stonedb-遍历元组数据时进行多线程拆解

2022-09-01 mysql/stonedb-遍历元组数据时进行多线程拆解

2022-09-01 mysql/stonedb-遍历元组数据时进行多线程拆解

二. 提取出每个任务所必须的数据

包含起始行号,起始pack号, 以及存放每个任务处理结果的数据结构。

例如:

int packnum = 0;
int curtuple_index = 0;
std::unordered_map<int, int> pack2cur;
while (mit.IsValid()) {
pack2cur.emplace(std::pair<int, int>(packnum, curtuple_index));

int64_t packrow_length = mit.GetPackSizeLeft();
curtuple_index += packrow_length;
packnum++;
mit.NextPackrow();
}

pack2cur.emplace(std::pair<int, int>(packnum, curtuple_index));

int loopcnt = (packnum < m_threads) ? packnum : m_threads;
int mod = packnum % loopcnt;
int num = packnum / loopcnt;
for (int i = 0; i < loopcnt; ++i) {
res.insert(
ha_rcengine_->query_thread_pool.add_task(&AggregationWorkerEnt::PrepShardingCopy, this, &mit, gb_main, &vGBW));

int pack_start = i * num;
int pack_end = 0;
int dwPackNum = 0;
if (i == (loopcnt - 1)) {
pack_end = packnum;
dwPackNum = packnum;
} else {
pack_end = (i + 1) * num - 1;
dwPackNum = pack_end + 1;
}

int cur_start = pack2cur[pack_start];
int cur_end = pack2cur[pack_end] - 1;

CTask tmp;
tmp.dwTaskId = i;
tmp.dwPackNum = dwPackNum;
tmp.dwStartPackno = pack_start;
tmp.dwEndPackno = pack_end;
tmp.dwStartTuple = cur_start;
tmp.dwEndTuple = cur_end;
tmp.dwTuple = cur_start;
tmp.dwPack2cur = &pack2cur;

vTask.push_back(tmp);
}

三. 在单独的任务内访问元组子集

前提是能正确的分割元组,以及处理临界区, 可以看个例子:

utils::result_set<void> res1;
for (uint i = 0; i < vTask.size(); ++i) {
if (dims.NoDimsUsed() == 0) dims.SetAll();
auto &mii = taskIterator.emplace_back(mit, true);
mii.SetTaskNum(vTask.size());
mii.SetTaskId(i);
}

for (size_t i = 0; i < vTask.size(); ++i) {
GroupByWrapper *gbw = i == 0 ? gb_main : vGBW[i].get();
res1.insert(ha_rcengine_->query_thread_pool.add_task(&AggregationWorkerEnt::TaskAggrePacks, this, &taskIterator[i],
&dims, &mit, &vTask[i], gbw, conn));
}
res1.get_all_with_except();
void AggregationWorkerEnt::TaskAggrePacks(MIIterator *taskIterator, [[maybe_unused]] DimensionVector *dims,
[[maybe_unused]] MIIterator *mit, [[maybe_unused]] CTask *task,
GroupByWrapper *gbw, Transaction *ci) {
taskIterator->Rewind();
int task_pack_num = 0;
while (taskIterator->IsValid()) {
if ((task_pack_num >= task->dwStartPackno) && (task_pack_num <= task->dwEndPackno)) {

int cur_tuple = (*task->dwPack2cur)[task_pack_num];
MIInpackIterator mii(*taskIterator);
AggregaGroupingResult grouping_result = aa->AggregatePackrow(*gbw, &mii, cur_tuple);
if (grouping_result == AggregaGroupingResult::AGR_FINISH) break;
if (grouping_result == AggregaGroupingResult::AGR_KILLED) throw common::KilledException();
if (grouping_result == AggregaGroupingResult::AGR_OVERFLOW ||
grouping_result == AggregaGroupingResult::AGR_OTHER_ERROR)
throw common::NotImplementedException("Aggregation overflow.");
}

taskIterator->NextPackrow();
++task_pack_num;
}
}

四. 合并每个任务的结果集

这也不是一个简单的事情, 在保证正确性的前提下, 避免内存拷贝.

具体的处理需要根据业务场景的不同做调整。

例子:

for (size_t i = 0; i < vTask.size(); ++i) {
// Merge aggreation data together
if (i != 0) {
aa->MultiDimensionalDistinctScan(*(vGBW[i]), mit);
gb_main->Merge(*(vGBW[i]));
}
}
void GroupByWrapper::Merge(GroupByWrapper &sec) {
int64_t old_groups = gt.GetNoOfGroups();
gt.Merge(sec.gt, m_conn);
if (tuple_left) tuple_left->And(*(sec.tuple_left));
packrows_omitted += sec.packrows_omitted;
packrows_part_omitted += sec.packrows_part_omitted;

// note that no_groups may be different than gt->..., because it is global
no_groups += gt.GetNoOfGroups() - old_groups;
}

参考:

《POSIX多线程编程》

《数据库系统实现》

《数据库查询优化器的艺术》

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月12日 0

暂无评论

推荐阅读
  TEZNKK3IfmPf   2024年05月31日   25   0   0 mysql
  TEZNKK3IfmPf   2024年05月17日   50   0   0 sqlmysql
  TEZNKK3IfmPf   2024年05月31日   30   0   0 数据库mysql
  TEZNKK3IfmPf   2024年05月17日   49   0   0 查询mysql索引
  TEZNKK3IfmPf   2024年05月17日   50   0   0 jsonmysql
  TEZNKK3IfmPf   2024年05月17日   48   0   0 mysqlphp
  TEZNKK3IfmPf   2024年05月31日   27   0   0 数据库mysql
TEZNKK3IfmPf