mysql/stonedb-聚合mysql/stonedb的遍历处理
  TEZNKK3IfmPf 2023年11月14日 17 0

记录聚合时候的遍历的处理, 主要是tuple_left的赋值与读取。进一步分析对下一次遍历的影响

核心处理:

调用堆栈:

(gdb) bt
#0 Tianmu::core::Filter::Block::Reset (this=0x7fc0823a0c40, n1=0, n2=3) at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-75-131-20220805/storage/tianmu/core/filter_block.cpp:115
#1 0x0000000002d47317 in Tianmu::core::Filter::ResetBetween (this=0x7fc1449425a0, b1=29, n1=0, b2=29, n2=3)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-75-131-20220805/storage/tianmu/core/filter.cpp:374
#2 0x0000000002df1899 in Tianmu::core::Filter::ResetDelayed (this=0x7fc1449425a0, b=29, pos=32) at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-75-131-20220805/storage/tianmu/core/filter.h:201
#3 0x0000000002df1759 in Tianmu::core::Filter::ResetDelayed (this=0x7fc1449425a0, n=1900576) at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-75-131-20220805/storage/tianmu/core/filter.h:67
#4 0x0000000003007764 in Tianmu::core::GroupByWrapper::TuplesReset (this=0x7fc215720220, pos=1900576)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-75-131-20220805/storage/tianmu/core/groupby_wrapper.h:162
#5 0x0000000003003f03 in Tianmu::core::AggregationAlgorithm::AggregatePackrow (this=0x7fc215720580, gbw=..., mit=0x7fc21571fee0, cur_tuple=1900576)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-75-131-20220805/storage/tianmu/core/aggregation_algorithm.cpp:568
#6 0x000000000300279b in Tianmu::core::AggregationAlgorithm::MultiDimensionalGroupByScan (this=0x7fc215720580, gbw=..., limit=@0x7fc215720208: 7422784, offset=@0x7fc215720608: 0, sender=0x0,
limit_less_than_no_groups=false) at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-75-131-20220805/storage/tianmu/core/aggregation_algorithm.cpp:288
#7 0x0000000003001fa3 in Tianmu::core::AggregationAlgorithm::Aggregate (this=0x7fc215720580, just_distinct=false, limit=@0x7fc215720600: -1, offset=@0x7fc215720608: 0, sender=0x0)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-75-131-20220805/storage/tianmu/core/aggregation_algorithm.cpp:202
#8 0x0000000002dee976 in Tianmu::core::TempTable::Materialize (this=0x7fc14492c7a0, in_subq=false, sender=0x7fc14492c6b0, lazy=false)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-75-131-20220805/storage/tianmu/core/temp_table.cpp:1972
#9 0x0000000002d36ec0 in Tianmu::core::Engine::Execute (this=0x7c57ec0, thd=0x7fc144000c00, lex=0x7fc144002f28, result_output=0x7fc14401e220, unit_for_union=0x0)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-75-131-20220805/storage/tianmu/core/engine_execute.cpp:426
#10 0x0000000002d36062 in Tianmu::core::Engine::HandleSelect (this=0x7c57ec0, thd=0x7fc144000c00, lex=0x7fc144002f28, result=@0x7fc215720d18: 0x7fc14401e220, setup_tables_done_option=0,
res=@0x7fc215720d14: 0, optimize_after_tianmu=@0x7fc215720d0c: 1, tianmu_free_join=@0x7fc215720d10: 1, with_insert=0)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-75-131-20220805/storage/tianmu/core/engine_execute.cpp:232
#11 0x0000000002e1e97f in Tianmu::dbhandler::TIANMU_HandleSelect (thd=0x7fc144000c00, lex=0x7fc144002f28, result=@0x7fc215720d18: 0x7fc14401e220, setup_tables_done_option=0, res=@0x7fc215720d14: 0,
optimize_after_tianmu=@0x7fc215720d0c: 1, tianmu_free_join=@0x7fc215720d10: 1, with_insert=0)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-75-131-20220805/storage/tianmu/handler/ha_rcengine.cpp:82
#12 0x000000000246221a in execute_sqlcom_select (thd=0x7fc144000c00, all_tables=0x7fc14401aff8) at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-75-131-20220805/sql/sql_parse.cc:5182
#13 0x000000000245b59e in mysql_execute_command (thd=0x7fc144000c00, first_level=true) at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-75-131-20220805/sql/sql_parse.cc:2831
#14 0x00000000024631e3 in mysql_parse (thd=0x7fc144000c00, parser_state=0x7fc215721eb0) at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-75-131-20220805/sql/sql_parse.cc:5621
#15 0x000000000245847b in dispatch_command (thd=0x7fc144000c00, com_data=0x7fc215722650, command=COM_QUERY) at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-75-131-20220805/sql/sql_parse.cc:1495
#16 0x00000000024573a7 in do_command (thd=0x7fc144000c00) at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-75-131-20220805/sql/sql_parse.cc:1034
#17 0x0000000002589f7d in handle_connection (arg=0x8d458b0) at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-75-131-20220805/sql/conn_handler/connection_handler_per_thread.cc:313
#18 0x0000000002c6dbae in pfs_spawn_thread (arg=0x8d1b890) at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-75-131-20220805/storage/perfschema/pfs.cc:2197
#19 0x00007fc21f3a1ea5 in start_thread () from /lib64/libpthread.so.0
#20 0x00007fc21d7d8b0d in clone () from /lib64/libc.so.6

核心函数:

int AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit, int64_t cur_tuple) {
int64_t packrow_length = mit->GetPackSizeLeft();
if (!gbw.AnyTuplesLeft(cur_tuple, cur_tuple + packrow_length - 1)) {
mit->NextPackrow();
return 5;
}
int64_t uniform_pos = common::NULL_VALUE_64;
bool skip_packrow = false;
bool packrow_done = false;
bool part_omitted = false;
bool stop_all = false;
bool aggregations_not_changeable = false;

bool require_locking_ag = true; // a new packrow, so locking will be needed
bool require_locking_gr = true; // do not lock if the grouping row is uniform

if (require_locking_gr) {
for (int gr_a = 0; gr_a < gbw.NumOfGroupingAttrs(); gr_a++)
gbw.LockPackAlways(gr_a, *mit); // note: ColumnNotOmitted checked
// inside//»á¼ÓÔؽâѹgroup byÁÐÊý¾Ý°ü
require_locking_gr = false;
}
if (require_locking_ag) {
for (int gr_a = gbw.NumOfGroupingAttrs(); gr_a < gbw.NumOfAttrs(); gr_a++)
gbw.LockPackAlways(gr_a, *mit); // note: ColumnNotOmitted checked inside
require_locking_ag = false;
}

gbw.ResetPackrow();
int64_t rows_in_pack = gbw.TuplesLeftBetween(cur_tuple, cur_tuple + packrow_length - 1);
DEBUG_ASSERT(rows_in_pack > 0);

TIANMU_LOG(LogCtl_Level::INFO, "AggregatePackrow cur: %d rows_in_pack: %d packrow_length: %d", cur_tuple,
rows_in_pack, packrow_length);

skip_packrow = AggregateRough(gbw, *mit, packrow_done, part_omitted, aggregations_not_changeable, stop_all,
uniform_pos, rows_in_pack, factor);
if (t->NumOfObj() + gbw.NumOfGroups() == gbw.UpperApproxOfGroups()) { // no more groups!
gbw.SetAllGroupsFound();
if (skip_packrow) // no aggr. changeable and no new groups possible?
packrow_done = true;
if (gbw.NumOfGroupingAttrs() == gbw.NumOfAttrs() // just DISTINCT without grouping
|| stop_all) { // or aggregation already done on rough level
gbw.TuplesResetAll(); // no more rows needed, just produce output
return 1; // aggregation finished
}
}
if (skip_packrow)
gbw.packrows_omitted++;
else if (part_omitted)
gbw.packrows_part_omitted++;
if (packrow_done) { // This packrow will not be needed any more
gbw.TuplesResetBetween(cur_tuple, cur_tuple + packrow_length - 1);
}

if (packrow_done || skip_packrow) {
mit->NextPackrow();
return 0; // success - roughly omitted
}

// bool require_locking_ag = true; // a new packrow,
// so locking will be needed bool require_locking_gr = (uniform_pos ==
// common::NULL_VALUE_64); // do not lock if the grouping row is uniform

while (mit->IsValid()) { // becomes invalid on pack end
if (m_conn->Killed()) return 2; // killed
if (gbw.TuplesGet(cur_tuple)) {
if (require_locking_gr) {
for (int gr_a = 0; gr_a < gbw.NumOfGroupingAttrs(); gr_a++)
gbw.LockPack(gr_a, *mit); // note: ColumnNotOmitted checked inside
require_locking_gr = false;
}

int64_t pos = 0;
bool existed = true;
if (uniform_pos != common::NULL_VALUE_64) // either uniform because of KNs, or = 0,
// because there is no grouping columns
pos = uniform_pos; // existed == true, as above
else {
for (int gr_a = 0; gr_a < gbw.NumOfGroupingAttrs(); gr_a++) {
if (gbw.ColumnNotOmitted(gr_a)) {
gbw.PutGroupingValue(gr_a, *mit);
}
}

existed = gbw.FindCurrentRow(pos);
}

if (pos != common::NULL_VALUE_64) { // Any place left? If not, just omit
// the tuple.
gbw.TuplesReset(cur_tuple); // internally delayed for optimization
// purposes - must be committed at the end
if (!existed) {
aggregations_not_changeable = false;
gbw.AddGroup(); // successfully added
if (t->NumOfObj() + gbw.NumOfGroups() == gbw.UpperApproxOfGroups()) { // no more groups!
gbw.SetAllGroupsFound();
if (gbw.NumOfGroupingAttrs() == gbw.NumOfAttrs()) { // just DISTINCT without grouping
gbw.TuplesResetAll(); // no more rows needed, just produce output
return 1; // aggregation finished
}
}
}
if (!aggregations_not_changeable) {
// Lock packs if needed
if (require_locking_ag) {
for (int gr_a = gbw.NumOfGroupingAttrs(); gr_a < gbw.NumOfAttrs(); gr_a++)
gbw.LockPack(gr_a,
*mit); // note: ColumnNotOmitted checked inside
require_locking_ag = false;
}

// Prepare packs for aggregated columns
for (int gr_a = gbw.NumOfGroupingAttrs(); gr_a < gbw.NumOfAttrs(); gr_a++)
if (gbw.ColumnNotOmitted(gr_a)) {
bool value_successfully_aggregated = gbw.PutAggregatedValue(gr_a, pos, *mit, factor);
if (!value_successfully_aggregated) {
gbw.DistinctlyOmitted(gr_a, cur_tuple);
}
}
}
}
}
cur_tuple++;
mit->Increment();
if (mit->PackrowStarted()) break;
}
gbw.CommitResets();
return 0; // success
}
void Filter::Commit() {
if (delayed_block > -1) {
if (block_status[delayed_block] == FB_FULL && delayed_stats >= 0)
ResetBetween(delayed_block, 0, delayed_block, delayed_stats);
delayed_block = -1;
delayed_stats = -1;
}
if (delayed_block_set > -1) {
if (block_status[delayed_block_set] == FB_EMPTY && delayed_stats_set >= 0)
SetBetween(delayed_block_set, 0, delayed_block_set, delayed_stats_set);
delayed_block_set = -1;
delayed_stats_set = -1;
}
}

重点逻辑:

_int64 rows_in_pack = gbw.TuplesLeftBetween(cur_tuple, cur_tuple + packrow_length - 1);
assert(rows_in_pack > 0);
gbw.TuplesReset(cur_tuple);        // internally delayed for optimization
void TuplesReset(int64_t pos) {
if (tuple_left) tuple_left->ResetDelayed(pos);
}
void ResetDelayed(uint64_t n) { ResetDelayed((int)(n >> no_power), (int)(n & (pack_def - 1))); }
void ResetDelayed(unsigned int b, int pos) {
DEBUG_ASSERT(b < no_blocks);
DEBUG_ASSERT(delayed_block_set == -1); // no mixing!
if (block_status[b] == FB_FULL) {
if (static_cast<size_t>(delayed_block) != b) {
Commit();
delayed_block = b;
delayed_stats = -1;
}
if (pos == delayed_stats + 1) {
delayed_stats++;
} else if (pos > delayed_stats + 1) { // then we can't delay
if (delayed_stats >= 0) ResetBetween(b, 0, b, delayed_stats);
Reset(b, pos);
delayed_stats = -2; // not to use any longer
}
// else do nothing
} else if (block_status[b] == FB_MIXED) {
Reset(b, pos);
return;
}
}
void Reset(unsigned int b,
int n) { // keep it inline - it's considerably faster
int optimized_size = -1;
block_changed[b] = 1;
if (block_status[b] == FB_FULL) {
if (n == block_last_one[b]) {
if (n == 0)
block_status[b] = FB_EMPTY;
else
block_last_one[b]--;
} else if (n < block_last_one[b]) { // else no change
optimized_size = block_last_one[b];
block_status[b] = FB_MIXED;
blocks[b] = block_allocator->Alloc();
if (b == no_blocks - 1)
new (blocks[b]) Block(block_filter, no_of_bits_in_last_block,
true); // block_filter->this// set as full,
// then reset a part of it
else
new (blocks[b]) Block(block_filter, pack_def, true); // block_filter->this
if (blocks[b] == NULL) throw common::OutOfMemoryException();
}
}
if (blocks[b]) {
if (blocks[b]->Reset(n)) ResetBlock(b);
if (optimized_size != -1 && (uint)optimized_size != (pack_def - 1) &&
(b < no_blocks - 1 || optimized_size != no_of_bits_in_last_block - 1)) {
blocks[b]->Reset(optimized_size + 1, (b < no_blocks - 1 ? (pack_def - 1) : no_of_bits_in_last_block - 1));
}
}
}
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月14日 0

暂无评论

推荐阅读
  TEZNKK3IfmPf   2024年05月31日   25   0   0 mysql
  TEZNKK3IfmPf   2024年05月17日   52   0   0 sqlmysql
  TEZNKK3IfmPf   2024年05月31日   31   0   0 数据库mysql
  TEZNKK3IfmPf   2024年05月17日   49   0   0 查询mysql索引
  TEZNKK3IfmPf   2024年05月17日   50   0   0 jsonmysql
  TEZNKK3IfmPf   2024年05月17日   49   0   0 mysqlphp
  TEZNKK3IfmPf   2024年05月31日   27   0   0 数据库mysql
TEZNKK3IfmPf