找回密码
 立即注册
首页 业界区 业界 [第五届 OceanBase 数据库大赛] 决赛复盘 | 2025 全国 ...

[第五届 OceanBase 数据库大赛] 决赛复盘 | 2025 全国大学生计算机系统能力大赛

腥狩频 昨天 22:45
日期:2025.12.20(凌晨) 12.25

个人总结:

有段时间没有写blog了,自从ICPC退役之后,一刻也没有为iCPC哀悼,紧接着登场的是OB比赛。
印象中西安区域赛是10月中旬那里吧,基本上打完西安区域赛就到ob的比赛了。
决赛是22号的下午6点结束,距离比赛结束还有两天的时间,很不幸的是22号我有考试,并且这门课这学期从来没有看过,所以实际的code时间大概就剩下了今天这一整天了。能不能把接下来的优化做出来,感觉已经很不好说了,由于改动到了一个很基本的头文件,导致现在要等很久编译,索性来补一篇这场比赛的记录。
ob的比赛真的要比我想象中的要难不少,其实在暑假打完数据库的比赛后听说了ob的比赛有点想参加,但是听说特别难于是放弃。
但是有个队伍差一个人,是上一次数据库比赛的前一个队伍,他们问我要不要一起打,我直接狠狠答应。于是就有了非常痛苦煎熬的道路。
update:12.25
决赛排名23名,无缘全国前20了qwq。
perface:

参赛队伍是[登山],初赛排名全国排名第十,决赛排名23名。
初赛之所以排名高,是因为我们在比赛前就规划好了先把去年2024年的题给做了,保证每个题都起码有一个人做过。由于那段时间和ICPC的时间重合,所以基本上都是白天ICPC,晚上数据库走起,初赛负责的题目并不是太多。有一个时间差,导致我们初赛满分的很快,这里感谢队长,把rag和初赛ob赛题的内容给包了,让我们初赛通过的时间很快。
但是我觉得我个人非常感觉后悔的一件事就是,在初赛满分了之后,我去搞了一个minilsm的lab,中间提了两个PR就紧接着去ob决赛了。现在想来,为什么当时打算做这个mini-lsm,是因为觉得决赛可能会用到,但是没有过这种性能优化的经验,导致走了很多弯路,最终这个lsm其实并没有对我的优化起到帮助。(虽然提了人生中前两个PR让我有点小窃喜)。
最后决赛两天一夜的时间去实现缓存,但是却没有实现出来,说实在的打击挺大的,因为我个人觉得这个缓存的实现难度并不是太高,甚至我最开始不是使用的KVCache,而是直接套了个hashmap去搞,但是最终火焰图发现在走get_next_batch_with_cache的时候几乎都会走laod_more_data,也就代表着我的缓存几乎没有什么用。在还有几分钟就要结束的时候才发现到了这个,真的觉得自己是个傻b。尽管拼命去找为什么了,最后还是惨淡收场。
ok说的有点长了,这里说一下本人负责的题目:
初赛:(除去了简单题):alias + simple_sub_query + complex_sub_query + create_view
复赛:内核赛道:index_merge优化 + 构建ID列索引 + 下推topn优化 + 缓存dim_iter or token_iter (rag 赛道是一点没有看过)
初赛

由于距离初赛都过去了快2个月的时间,我对于代码也并不是印象太深刻了,所以在这里挖一个坑吧,以后后续说不定回来填。
决赛

首先大概说一下,决赛的内容就是让我们去改动seekdb的源码,对于一条sql语句(类似是这个样子,实际的筛选条件由测试脚本决定)
  1. SELECT docid_col, MATCH(fulltext_col) AGAINST('test') as _score
  2. FROM test_insert_en
  3. WHERE MATCH(fulltext_col) AGAINST('text')
  4.   and base_id in ('base_id_1', 'base_id_2', 'base_id_3')
  5.   and id < 1000
  6. ORDER BY _score DESC
  7. LIMIT 10;
复制代码
进行优化,QPS肯定越高越好。初始的QPS官方评测机跑出来是 14 QPS上下浮动。
这里先大概说明,以下四个优化大概最后能优化到QPS600+。
index_merge 优化

其实这个思路的来源是官方的决赛指导中,提出了一个二阶段index merge优化的思路,让我知道了原来还有索引合并这种东西。官方可能第一阶段没有研究的很好?因为是演示时拿的oceanbase,然后直接建立在了index merge的基础上给了我们一个优化思路,但是seekdb本身甚至跑不起来index merge。
于是我觉得,可以先尝试实现index merge,之后再考虑这个二阶段的事情。
然后翻阅了源码后发现,seekdb原生是支持or条件的index merge,并没有支持and条件的(也没有支持IN条件表达式)。所以我们的目的就是让seekdb去支持and条件下的index merge。其实这部分可以去看oceanbase的源码,但是我当时脑子其实并没有转过来,因为发现ob的源码里是对于index merge and的情况去搞了一个index_merge_and_iter,我觉得有点麻烦,索性就直接自己搞了。
这里首先说明的是,在join_order.cpp里面,做了一些关于index_merge_or的一些情况。这个文件主要是用来进行一些路径的创建。
这里有一个大概的执行顺序(我也不是特别的确定):对于我们要走的index merge方法,优化器会先生成一条index merge的路径,然后根据这个路径去创建逻辑计划树,再然后是物理执行树,然后会是一个迭代器树,主要的代码运行也基本都是耗在这个迭代器树上,会进行各种扫描。
然后我们会发现,join_order.cpp里面,有一些关于index_merge_or的内容,但是缺少了and的情况。
所以我们就是要在这里进行修改。
  1. create_access_paths -> create_index_merge_access_paths -> get_candi_index_merge_trees       ->  get_valid_index_merge_indexes
  2.                                                                                             ->  generate_candi_index_merge_trees
  3.                                                           !candi_index_trees.empty()        ->  check_candi_index_trees_match_hint
  4.                                                                                             ->  ......
  5.                                                                                             ->  get_valid_index_merge_indexes
  6.                                                                                             ->  generate_candi_index_merge_trees
  7.                                                                                             ->  prune_candi_index_merge_trees
  8.                                                        -> do_create_index_merge_paths       ->  choose_best_selectivity_branch
  9.                                                                                             ->  root_node->formalize_index_merge_tree()
  10.                                                                                             ->  build_access_path_for_scan_node
  11.                                                                                             ->  create_one_index_merge_path
  12.                                                                                             ->  ......
  13.                                                                                             ->  access_paths.push_back(static_cast(index_merge_path)))
  14.                                                        -> check_index_merge_paths_contain_fts
  15.                       (!is_match_hint && !contain_fts) -> prune_index_merge_path
复制代码
这里是我当时做的时候列出来的一个函数的执行过程。
其中generate_candi_index_merge_trees 这个函数是用来构建一些候选的index_merge_tree。
这里有一个小地方就是:我们可以通过简单地打日志的办法,发现对于filter来说,如果支持了这种or的情况,那么多半filter的count()就是1,ob会对于这种情况,也就是filter搞出来一个树,这个树本身也是为了支持or条件而搞出来了的。那些or条件的内容都是filter的children。
那如果我们全是and的情况,ob并不会搞出来一个树,而就是一个个节点本身,也就是说,如果我们的谓词筛选是: where a < 1 and b > 1, 这种情况filter的count()就是2。
所以我的代码修改就是在这个函数里,加上对于and情况的支持:
  1.        // 检查所有filters是否都是可索引的简单条件
  2.        ObSEArray<ObIndexMergeNode*, 4> valid_nodes;
  3.        ObSEArray<bool, 4> valid_flags;
  4.       
  5.        for (int64_t i = 0; OB_SUCC(ret) && i < filters.count(); ++i) {
  6.          ObRawExpr *filter = filters.at(i);
  7.          if (OB_ISNULL(filter) ||
  8.              filter->get_expr_type() == T_OP_OR ||
  9.              filter->get_expr_type() == T_OP_AND) {
  10.            continue;
  11.          }
  12.          
  13.          ObIndexMergeNode *candi_node = NULL;
  14.          bool is_valid = false;
  15.          if (OB_FAIL(generate_candi_index_merge_node(ref_table_id,
  16.                                                     filter,
  17.                                                     valid_index_ids,
  18.                                                     valid_index_cols,
  19.                                                     candi_node,
  20.                                                     is_valid))) { //在这个函数里面我们也会做适当地修改
  21.            LOG_WARN("failed to generate node for simple filter", K(ret), KPC(filter));
  22.          } else if (is_valid && OB_NOT_NULL(candi_node)) {
  23.            valid_nodes.push_back(candi_node); //如果发现ok了,我们就放进去。
  24.            valid_flags.push_back(true);
  25.          } else {
  26.            valid_flags.push_back(false);
  27.          }
  28.        }
  29.            // 如果所有条件都可索引,则构建AND树
  30.        if (valid_flags.count() == filters.count() && valid_flags.count() > 1 && valid_nodes.count() > 1) {
  31.          ObIndexMergeNode *and_root = NULL; //这里我就十分暴力的去创建出来一个INderMergeNode root,去支持and的情况。
  32.          if (OB_NOT_NULL(and_root = OB_NEWx(ObIndexMergeNode, allocator_))) {
  33.            and_root->node_type_ = INDEX_MERGE_INTERSECT;
  34.            for (int64_t i = 0; i < valid_nodes.count(); ++i) {
  35.              if (OB_FAIL(and_root->children_.push_back(valid_nodes.at(i)))) {
  36.                LOG_WARN("failed to add child to AND root", K(ret));
  37.              }
  38.            }
  39.            adlog::DEBUG("and_root->node_type_ ",(int64_t)and_root->node_type_);
  40.            adlog::DEBUG("and_root->children_ count() ",and_root->children_.count());
  41.            if (OB_FAIL(and_root->formalize_index_merge_tree())) {
  42.              LOG_WARN("failed to formalize AND tree", K(ret));
  43.            } else {
  44.              if (OB_FAIL(candi_index_trees.push_back(and_root))) {
  45.                LOG_WARN("failed to push back AND tree", K(ret));
  46.              } else {
  47.                and_root->gen_code_ = 114514;
  48.                adlog::DEBUG("Added AND merge tree for ", valid_nodes.count(), " conditions");
  49.              }
  50.            }
  51.            adlog::DEBUG("and_root->node_type_ ",(int64_t)and_root->node_type_);
  52.            adlog::DEBUG("and_root->children_ count() ",and_root->children_.count());
  53.          }
  54.        }
复制代码
但是这里其实还有一点,就是ob本身其实时时刻刻都会有index_merge的情况。这应该是一些内部的优化吧,我并没有很深入了解。但是这种情况其实是和我们要创建index_merge_and的情况是冲突的。
这里就不得不提一下这里做法的弊端了:由于赛题本身是对混合查询做优化,所以这里index_merge的优化其实是很重要的。有一个关于计算cost的情况我并没有提及,是因为赛时时间过于紧张,个人期望是可以走index merge就会走index merge,所以cost方面我直接设定为0。这显然并不好,后续应当对这种情况进行完善。
接着说回来,对于后台时刻进行别的index merge的情况,这里对于ObIndexMergeNode,我创建了一个gen_code_,用来标记判断是后台的别的情况的index merge,还是我这边为了混合查询而做的index merge。(这里是用于进行后续的判断的)。
而对于generate_candi_index_merge_node这个函数,其实主要是修改
  1. else {
  2.      ObSEArray<uint64_t, 1> candicate_index_tids;
  3.      if (!get_tables().equal(filter->get_relation_ids())) {
  4.        is_valid_node = false;
  5.      } else if (!(filter->has_flag(IS_SIMPLE_COND) ||
  6.                   filter->has_flag(IS_RANGE_COND) ||
  7.                   filter->has_flag(CNT_MATCH_EXPR) ||
  8.                   (filter->has_flag(IS_IN) ))) {
  9.        is_valid_node = false;
复制代码
用来支持in的情况。
然后比较关键的还有这个:
  1. // ADVISE
  2. // 在这个函数里面,是对于 (已经构建好了AND树的and_root,去构建出来路径)
  3. int ObJoinOrder::do_create_index_merge_paths
复制代码
其中,这里我们主要是修改了原本对于INTERSECT的情况的判断,因为之前是对于INTERSECT的情况,我们是要选择出来一个最优解的执行的,而不是全都要,去跑index merge。
  1. int ObJoinOrder::choose_best_selectivity_branch(ObIndexMergeNode *&candi_node) {
  2.    adlog::DEBUG("ObJoinOrder::choose_best_selectivity_branch START");
  3.    int ret = OB_SUCCESS;
  4.    if (OB_ISNULL(candi_node)) {
  5.      ret = OB_ERR_UNEXPECTED;
  6.      LOG_WARN("get unexpected null", K(ret), KPC(candi_node));
  7.      adlog::DEBUG("choose_best_selectivity_branch:: get unexpected null");
  8.    } else if (INDEX_MERGE_UNION == candi_node->node_type_) {
  9.      adlog::DEBUG("choose_best_selectivity_branch:: INDEX_MERGE_UNION == candi_node->node_type_");
  10.      for (int64_t i = 0; OB_SUCC(ret) && i < candi_node->children_.count(); ++i) {
  11.        if (OB_FAIL(SMART_CALL(choose_best_selectivity_branch(candi_node->children_.at(i))))) {
  12.          LOG_WARN("failed to choose best branch for child", K(ret), KPC(candi_node->children_.at(i)));
  13.        }
  14.      }
  15.    } else if (INDEX_MERGE_INTERSECT == candi_node->node_type_) {
  16.      adlog::DEBUG("choose_best_selectivity_branch:: INDEX_MERGE_INTERSECT == candi_node->node_type_");
  17.      bool has_scan_children = true;  
  18.      // for (int64_t i = 0; i < candi_node->children_.count(); ++i) {
  19.      //   if (OB_NOT_NULL(candi_node->children_.at(i)) &&
  20.      //       (candi_node->children_.at(i)->node_type_ != INDEX_MERGE_SCAN and
  21.      //        candi_node->children_.at(i)->node_type_ != INDEX_MERGE_FTS_INDEX)) {
  22.      //     adlog::DEBUG("candi_node->children_.at(i)->node_type_ is error idx: ",i," type : ",(int64_t)candi_node->children_.at(i)->node_type_);
  23.      //     has_scan_children = false;
  24.      //     break;
  25.      //   }
  26.      // }
  27.      adlog::DEBUG("choose_best_selectivity_branch::  candi_node->children_.count() ",candi_node->children_.count());
  28.      adlog::DEBUG("choose_best_selectivity_branch::  has_scan_children ",has_scan_children);
  29.      bool go_index_merge_path_flag = true;
  30.      if(candi_node->gen_code_ != 114514) go_index_merge_path_flag = false;
  31.      if(go_index_merge_path_flag){
  32.        for (int64_t i = 0; OB_SUCC(ret) && i < candi_node->children_.count();++i) {
  33.          if
  34.          (OB_FAIL(SMART_CALL(choose_best_selectivity_branch(candi_node->children_.at(i)))))
  35.          {
  36.            LOG_WARN("failed to choose best branch for child", K(ret),
  37.            KPC(candi_node->children_.at(i)));
  38.          }
  39.        }
  40.      } else {
  41.        //......
  42.        //......
  43.    }
  44.    adlog::DEBUG("ObJoinOrder::choose_best_selectivity_branch END");
  45.    return ret;
  46. }
复制代码
这里就是对于我们之前的gen_code_去进行判断了,之所以说之前有冲突就是在这个地方。所以我们的gen_code_就派上用了,可以通过这个来区别开来,这个INTERSECT,到底是我们要跑的index merge and树,还是后台运行的别的内容。
大概是改完以上的内容后,可以跑通index_merge了,可以用explain检测一下。
但是大概率会出现Error的情况。因为关于实现index_merge_and,我们还需要修改index_merge_iter.cpp里面的内容。
之所以会出现Error的情况,是因为seekdb原生不支持index_merge_and,所以在ObDASIndexMergeIter::inner_init函数里面,对于INTERSECT的情况专门返回了ERROR。
所以我们改一下就好。
  1.     if (OB_UNLIKELY(merge_type_ != INDEX_MERGE_UNION and merge_type_ != INDEX_MERGE_INTERSECT)) {
  2.       ret = OB_INVALID_ARGUMENT;
  3.       LOG_WARN("invalid merge type", K(merge_type_));
  4.     } else
  5.       if (OB_FAIL(CURRENT_CONTEXT->CREATE_CONTEXT(mem_ctx_, context_param))) {
  6.       LOG_WARN("failed to create index merge memctx", K(ret));
  7.     } else {
  8.       common::ObArenaAllocator &alloc = mem_ctx_->get_arena_allocator();
  9.       child_iters_.set_allocator(&alloc);
  10.           //....
复制代码
这样做了之后,我们的sql语句应该是会有返回结果的,但是召回率不出意外是有问题的。
这是因为intersect_get_next_rows函数内部实现的不对。
其实这里我们参考一下union_get_next_rows的做法就大概能知道,这里少了关于result_buffer_的应用。导致我们返回的行有了数据覆盖等问题。
所以我们应该仿照union_get_next_rows,去重写一下这个intersect_get_next_rows就好了。
值得注意的是:关于这个函数的返回条件了,我最开始并没有写“当有迭代器扫描结束后就直接使得ret = ITER_END”,导致测试的时候一直超时。所以这个点一定要写。
大概的实现内容:
  1. int ObDASIndexMergeIter::intersect_get_next_rows(int64_t &count, int64_t capacity)  
  2. {
  3.   int ret = OB_SUCCESS;
  4.   adlog::DEBUG("ObDASIndexMergeIter::intersect_get_next_rows start");
  5.   // {
  6.   //   adlog::DEBUG("INTERSECT rowkey_exprs_ count", K(rowkey_exprs_->count()));  
  7.   //   for (int64_t i = 0; i < rowkey_exprs_->count(); i++) {
  8.   //     // adlog::DEBUG("rowkey_expr", i, std::to_string(*rowkey_exprs_->at(i)));
  9.   //     {
  10.   //       char buf[1024];  
  11.   //       int64_t pos = 0;  
  12.   //       pos = rowkey_exprs_->at(i)->to_string(buf, sizeof(buf));  
  13.   //       adlog::DEBUG("rowkey_expr", i, std::string(buf, pos));
  14.   //     }
  15.   //   }
  16.   // }
  17.   
  18.   count = 0;  
  19.   result_buffer_.reuse();
  20.   
  21.   while (OB_SUCC(ret) && count < capacity) {  
  22.     /* try to fill each child store */  
  23.     int64_t output_idx = OB_INVALID_INDEX;  
  24.     int cmp_ret = 0;  
  25.     int64_t child_rows_cnt = 0;  
  26.       
  27.     for (int64_t i = 0; OB_SUCC(ret) && i < child_stores_.count(); i++) {
  28.       IndexMergeRowStore &child_store = child_stores_.at(i);
  29.       child_rows_cnt = 0;
  30.       if (!child_store.have_data()) {
  31.         if (!child_store.iter_end_) {
  32.           ObDASIter *child_iter = child_iters_.at(i);
  33.           if (OB_ISNULL(child_iter)) {
  34.             ret = OB_ERR_UNEXPECTED;
  35.             adlog::DEBUG("unexpected nullptr", K(i));
  36.           } else {
  37.             ret = child_iter->get_next_rows(child_rows_cnt, capacity);
  38.             if (OB_FAIL(ret)) {
  39.               if (OB_ITER_END != ret) {
  40.                 adlog::DEBUG("WARNING child_iter->get_next_rows(child_rows_cnt, capacity);");
  41.               }
  42.             }
  43.             adlog::DEBUG("child_iter: ",K(i),"  ", K(child_rows_cnt));
  44.             if (OB_ITER_END == ret && child_rows_cnt > 0) {
  45.               ret = OB_SUCCESS;
  46.             }
  47.             if (OB_SUCC(ret)) {
  48.               if (OB_FAIL(child_store.save(true, child_rows_cnt))) {
  49.                 adlog::DEBUG("WARNING failed to save child rows", K(child_rows_cnt), K(ret));
  50.               } else if (OB_FAIL(compare(i, output_idx, cmp_ret))) {
  51.                 adlog::DEBUG("WARNING index merge failed to compare row", K(i), K(output_idx), K(ret));
  52.               } else if (child_iter->get_type() == DAS_ITER_SORT) {
  53.                 adlog::DEBUG("i: ",i,"child_iter->get_type() == DAS_ITER_SORT" );
  54.                 reset_datum_ptr(child_iter->get_output(), child_rows_cnt);
  55.               }
  56.             } else if (OB_ITER_END == ret) {
  57.               child_store.iter_end_ = true;
  58.               ret = OB_SUCCESS;
  59.             } else {
  60.               adlog::DEBUG("WARNING failed to get next rows from child iter", K(ret));
  61.             }
  62.           }
  63.         }
  64.       } else if (OB_FAIL(compare(i, output_idx, cmp_ret))) {
  65.         adlog::DEBUG("WARNING index merge failed to compare row", K(i), K(output_idx), K(ret));
  66.       }
  67.     }
  68.     if (OB_FAIL(ret)) {
  69.       adlog::DEBUG("WARNING OB_FAIL(RET)");
  70.     } else if (output_idx == OB_INVALID_INDEX) {  
  71.       ret = OB_ITER_END;
  72.       adlog::DEBUG("ret = OB_ITER_END");
  73.     } else {   
  74.       bool all_matched = true;
  75.       for (int64_t i = 0; OB_SUCC(ret) && i < child_stores_.count(); i++) {
  76.         if (output_idx == i) {  
  77.           continue;
  78.         }
  79.         if ((!child_stores_.at(i).have_data())) {
  80.             all_matched = false;  // 这个child没有数据,不匹配
  81.             ret = OB_ITER_END;
  82.         } else if (OB_FAIL(compare(i, output_idx, cmp_ret))) {  
  83.           adlog::DEBUG("index merge failed to compare row", K(i), K(output_idx), K(ret));  
  84.         } else if (cmp_ret == 0) {  
  85.          
  86.         } else {
  87.           all_matched = false;  // rowkey不匹配  
  88.         }
  89.       }
  90.       if (OB_FAIL(ret)) {
  91.         adlog::DEBUG("OB_FAIL(ret)");
  92.         continue;
  93.       }
  94.       if (!all_matched) {
  95.         child_stores_.at(output_idx).cur_idx_++;  
  96.         continue;
  97.       }
  98.       for (int64_t i = 0; OB_SUCC(ret) && i < child_stores_.count(); i++) {
  99.         if (output_idx == i) {  
  100.           continue;
  101.         }
  102.         child_stores_.at(i).to_expr();
  103.       }
  104.       child_stores_.at(output_idx).to_expr();
  105.       if (OB_FAIL(save_row_to_result_buffer())) {  
  106.         adlog::DEBUG("failed to save row to result buffer", K(ret));
  107.       } else {  
  108.         count += 1;
  109.       }
  110.     }  
  111.   }
  112.   if(OB_FAIL(ret)){
  113.     if (ret == OB_ITER_END) {
  114.       adlog::DEBUG("intersect_get_next_rows : OB_FAIL(ret) ret == OB_ITER_END");
  115.     } else {
  116.       adlog::DEBUG("intersect_get_next_rows : OB_FAIL(ret)");
  117.     }
  118.   }
  119.   adlog::DEBUG("intersect_get_next_rows :",K(count));
  120.   
  121.   if (OB_ITER_END == ret && count > 0) {  
  122.     ret = OB_SUCCESS;  
  123.   }  
  124.   if (OB_SUCC(ret) && count > 0) {
  125.     if (OB_FAIL(result_buffer_.to_expr(count))) {
  126.       adlog::DEBUG("failed to convert result buffer to exprs", K(ret));  
  127.       LOG_WARN("failed to convert result buffer to exprs", K(ret));  
  128.     }  
  129.   }  
  130.   return ret;  
  131. }
复制代码
这里的排查内容意外的很辛苦,因为我一度怀疑过很多地方,例如什么rowkey不匹配,FTS返回的结果有问题,没有SORT等,最后发现都没有问题。只能说其实怀疑的这些地方,通过检测union的正确性,就可以不用浪费那么多时间,可能这也是为后面没有时间写出来缓存埋下伏笔了吧。
实现这个优化,QPS会从14到150那里。
构建ID列索引

这个其实反倒是没有注意到,因为我发现我的测试脚本最开始很有问题,官方后来修复了之后我并没有重新拉取,而且导致我的本地出现了一些莫名其妙的问题。例如,我这边本地测试的时候,id列其实是有索引的。但是我后来才知道原来并没有索引,这个在官方最后一周给出来了,通过这个优化,可以从150QPS到200QPS。
这里是队友写的了,我就直接贴代码了:
主要是在ObCreateTableResolver::resolve_table_elements中
  1.     if (OB_SUCC(ret)) {
  2.       // MySQL 模式下,AUTO_INCREMENT 列必须有索引。如果用户未手动为自增列建索引,
  3.       // 自动为该列创建一个普通(非唯一)索引,避免违反引擎约束。
  4.       if (lib::is_mysql_mode() && 0 != autoinc_column_id && !table_schema.is_external_table()) {
  5.         ObCreateTableStmt *create_table_stmt = static_cast<ObCreateTableStmt*>(stmt_);
  6.         ObColumnSchemaV2 *autoinc_col = NULL;
  7.         if (OB_ISNULL(create_table_stmt)) {
  8.           ret = OB_ERR_UNEXPECTED;
  9.           SQL_RESV_LOG(WARN, "unexpected null create_table_stmt when auto add index for autoinc", K(ret));
  10.         } else if (OB_ISNULL(autoinc_col =
  11.                     create_table_stmt->get_create_table_arg().schema_.get_column_schema(
  12.                         autoinc_column_id))) {
  13.           ret = OB_ERR_UNEXPECTED;
  14.           SQL_RESV_LOG(WARN, "failed to get autoinc column schema when auto add index",
  15.                        K(ret), K(autoinc_column_id));
  16.         } else if (autoinc_col->is_rowkey_column()) {
  17.           // 已经在主键/rowkey 上,无需额外索引
  18.         } else {
  19.           // 构造一个仅包含自增列的普通本地索引
  20.           HEAP_VARS_2((ObCreateIndexStmt, create_index_stmt), (ObPartitionResolveResult, resolve_result)) {
  21.             reset();
  22.             index_attributes_set_ = OB_DEFAULT_INDEX_ATTRIBUTES_SET;
  23.             index_arg_.reset();
  24.             sort_column_array_.reset();
  25.             store_column_names_.reset();
  26.             hidden_store_column_names_.reset();
  27.             index_keyname_ = NORMAL_KEY;
  28.             index_scope_ = NOT_SPECIFIED; // MySQL 默认本地索引
  29.             name_generated_type_ = GENERATED_TYPE_SYSTEM;
  30.             has_index_using_type_ = false;
  31.             ObColumnSortItem sort_item;
  32.             sort_item.column_name_ = autoinc_col->get_column_name_str();
  33.             sort_item.prefix_len_ = -1;
  34.             sort_item.order_type_ = common::ObOrderType::ASC;
  35.             ObString uk_name;
  36.             if (OB_FAIL(resolve_index_name(nullptr, sort_item.column_name_, false, uk_name))) {
  37.               SQL_RESV_LOG(WARN, "resolve auto index name failed", K(ret));
  38.             } else if (OB_FAIL(add_sort_column(sort_item))) {
  39.               SQL_RESV_LOG(WARN, "add auto index column failed", K(ret), K(sort_item));
  40.             } else if (OB_FAIL(generate_index_arg(false /*process_heap_table_primary_key*/))) {
  41.               SQL_RESV_LOG(WARN, "generate auto index arg failed", K(ret));
  42.             } else {
  43.               ObCreateIndexArg &create_index_arg = create_index_stmt.get_create_index_arg();
  44.               ObSArray<ObPartitionResolveResult> &resolve_results =
  45.                   create_table_stmt->get_index_partition_resolve_results();
  46.               ObSArray<obrpc::ObCreateIndexArg> &index_arg_list =
  47.                   create_table_stmt->get_index_arg_list();
  48.               index_arg_.index_key_ = static_cast<int64_t>(index_keyname_);
  49.               if (OB_FAIL(create_index_arg.assign(index_arg_))) {
  50.                 LOG_WARN("fail to assign auto index arg", K(ret));
  51.               } else if (OB_FAIL(resolve_results.push_back(resolve_result))) {
  52.                 LOG_WARN("fail to push back auto index resolve result", K(ret));
  53.               } else if (OB_FAIL(index_arg_list.push_back(create_index_arg))) {
  54.                 LOG_WARN("fail to push back auto index arg", K(ret));
  55.               }
  56.             }
  57.           }
  58.         }
  59.       }
  60.     }
复制代码
下推topn

其实本来是打算去实现官方说的二阶段优化的,但是我写到中间,发现太难实现了。需要改掉很多内容,路径,逻辑计划执行计划迭代器树等内容都需要更改。所以直接放弃了。
但是有个我们可以发现一个内容,就是去用explain,我们可以发现,主要是有两层,第一层是一个sort算子,里面内置了limit,去限制个数。第二层是我们目前已经改过了的index merge。
会发现其实有一个很耗时的内容,就是我们第二层扫描的结果很多,但是都需要返回到第一层再进行过滤,这个太慢了,我们完全可以在第二层的时候就把他过滤掉,毕竟我们是可以获取到这些数据的。耗时的一个关键点还有是因为我们在index merge的时候,那个result_buffer是进行拷贝操作的,所以也会有耗时内容。
那么我们要做什么呢?
首先我们是需要把limit给提取出来。但是注意,提取的并不是limit的value,而是limit_expr。
我一开始的时候没有考虑明白,把limit_value给记录了下来,放到了index_merge_ctdef里面,但是这样显然是不对的。毕竟这个ctdef是在构建执行计划的时候才会采用,但是我们之后再执行这样的语句的时候,其实并不会构建ctdef,而是会走缓存路线,只会构建出来rtdef。所以我们要做的,应该是把limit_expr给存到ctdef里面,然后每一次重新生成rtdef的时候,通过limit_expr把值算出来,再存到rtdef里面。
关于limit_expr的提取,我是在ObLogPlan::candi_allocate_order_by做的。
我的目的主要是,希望可以检测到,如果我们是这个sort+limit的形式,并且下层是index_merge的时候,把sort算子去掉,提取到它的limit_expr,然后放到我们的下层去。
  1.   } else {
  2.     for (int64_t i = 0; OB_SUCC(ret) && i < candidates_.candidate_plans_.count(); i++) {
  3.       bool is_reliable = false;
  4.       CandidatePlan candidate_plan = candidates_.candidate_plans_.at(i);
  5.       OPT_TRACE("generate order by for plan:", candidate_plan);
  6.       if (OB_FAIL(create_order_by_plan(candidate_plan.plan_tree_,
  7.                                        order_items,
  8.                                        topn_expr,
  9.                                        is_fetch_with_ties))) {
  10.         LOG_WARN("failed to create order by plan", K(ret));
  11.       } else if (NULL != topn_expr && OB_FAIL(is_plan_reliable(candidate_plan.plan_tree_,
  12.                                                                is_reliable))) {
  13.         LOG_WARN("failed to check if plan is reliable", K(ret));
  14.       } else if (is_reliable) {
  15.         ret = limit_plans.push_back(candidate_plan);
  16.       } else {
  17.         // 在这里试试看能不能把sort算子给去掉 advise
  18.         adlog::DEBUG("在这里试试看能不能把sort算子给去掉");
  19.         do {
  20.           int ret = OB_SUCCESS;
  21.           ObLogicalOperator *&top = candidate_plan.plan_tree_;
  22.           if (OB_ISNULL(top) || log_op_def::LOG_SORT != top->get_type()) {  
  23.             // 不是sort算子,无需处理  
  24.             adlog::DEBUG("不是sort算子啊");
  25.           } else {  
  26.             ObLogSort *sort_op = static_cast<ObLogSort*>(top);  
  27.             ObLogicalOperator *child = sort_op->get_child(0);  
  28.             if (OB_NOT_NULL(child) && log_op_def::LOG_TABLE_SCAN == child->get_type()) {
  29.               ObLogTableScan *table_scan = static_cast<ObLogTableScan *>(child);
  30.               if (table_scan->use_index_merge()) {
  31.                 // 使用的index merge
  32.                 adlog::DEBUG("下层用的是index merge");
  33.                 // get_limit_expr
  34.                 // auto && limit_expr = table_scan->get_limit_expr();
  35.                 if (sort_op->get_topn_expr() != nullptr) {
  36.                   // (limit_expr) = (sort_op->get_topn_expr());
  37.                   table_scan->set_limit_expr(sort_op->get_topn_expr());
  38.                   // 保留 Sort 算子用于精排(两阶段检索)
  39.                   sort_op->set_topn_expr(nullptr);
  40.                   adlog::DEBUG("把sort里面的limit_expr放到了当前的table_"
  41.                                "scan的limit_count_expr里了");
  42.                   table_scan->set_parent(nullptr);
  43.                 }
  44.                 // 保留 Sort 算子,不跳过
  45.                 top = child;
  46.               }
  47.               else {
  48.                 adlog::DEBUG("但是下层使用的不是index merge");
  49.               }
  50.             } else {
  51.               adlog::DEBUG("下层不是table scan啊");
  52.             }  
  53.           }
  54.          
  55.         }while(0);
  56.         ret = order_by_plans.push_back(candidate_plan);
  57.       }
复制代码
另外,当时其实并没有分析的很明白。
在ObSelectLogPlan::allocate_plan_top里也做了一遍。
  1.     // allocate root exchange
  2.     if (OB_SUCC(ret) && is_final_root_plan()) {
  3.       // allocate material if there is for update without skip locked.
  4.       // FOR UPDATE SKIP LOCKED does not need SQL-level retry, hence we don't need a MATERIAL to
  5.       // block the output.
  6.       if (optimizer_context_.has_no_skip_for_update()
  7.           && OB_FAIL(candi_allocate_for_update_material())) {
  8.         LOG_WARN("failed to allocate material", K(ret));
  9.         //allocate temp-table transformation if needed.
  10.       } else if (!get_optimizer_context().get_temp_table_infos().empty() &&
  11.                  OB_FAIL(candi_allocate_temp_table_transformation())) {
  12.         LOG_WARN("failed to allocate transformation operator", K(ret));
  13.       } else if (OB_FAIL(candi_allocate_root_exchange())) {
  14.         LOG_WARN("failed to allocate root exchange", K(ret));
  15.       } else {
  16.         LOG_TRACE("succeed to allocate root exchange", K(candidates_.candidate_plans_.count()));
  17.       }
  18.     }
  19.     // 在这里看能不能消除掉sort
  20.     if (OB_SUCC(ret)) {  
  21.       for (int64_t i = 0; OB_SUCC(ret) && i < candidates_.candidate_plans_.count(); i++) {
  22.         ObLogicalOperator *&top = candidates_.candidate_plans_.at(i).plan_tree_;
  23.         if (OB_ISNULL(top) || log_op_def::LOG_SORT != top->get_type()) {
  24.           continue;
  25.         }
  26.         ObLogicalOperator *child = top->get_child(0);
  27.         if (OB_ISNULL(child) or
  28.             log_op_def::LOG_TABLE_SCAN != child->get_type()) {
  29.           adlog::DEBUG("下层不是table scan啊");
  30.           continue;
  31.         }
  32.         ObLogTableScan *table_scan = static_cast<ObLogTableScan *>(child);
  33.         if (!table_scan->use_index_merge()) {
  34.           adlog::DEBUG("下层用的不是index merge 啊");
  35.           continue;
  36.         }
  37.         adlog::DEBUG("下层用的是index merge");
  38.         ObLogSort *sort_op = static_cast<ObLogSort *>(top);
  39.         
  40.         if (sort_op->get_topn_expr() != nullptr) {
  41.           table_scan->set_limit_expr(sort_op->get_topn_expr());
  42.           // 保留 Sort 算子用于精排(两阶段检索)
  43.           sort_op->set_topn_expr(nullptr);
  44.           adlog::DEBUG("把sort里面的limit_expr放到了当前的table_"
  45.                        "scan的limit_count_expr里了");
  46.           table_scan->set_parent(nullptr);
  47.         }
  48.         // 保留 Sort 算子,不跳过
  49.         top = child;
  50.       
  51.       }  
  52.     }
  53.   }
  54.   return ret;
  55. }
复制代码
但是我们做了以上内容后,explain发现,竟然上层的sort算子还是没有取消掉,明明明这里确实是提取出来了limit_expr。这是为什么呢?
是因为ObSelectLogPlan::candi_allocate_order_by_if_losted这个函数会进行一个补充,把丢失的order给弄回来。
所以这里我们也需要再特判一下:
  1. int ObSelectLogPlan::candi_allocate_order_by_if_losted(ObIArray<OrderItem> &order_items)
  2. {
  3.   int ret = OB_SUCCESS;
  4.   bool re_allocate_happened = false;
  5.   ObSEArray<CandidatePlan, 8> order_by_plans;
  6.   if (!order_items.empty()) {
  7.     candidates_.is_final_sort_ = true;
  8.     for (int64_t i = 0; OB_SUCC(ret) && i < candidates_.candidate_plans_.count(); i++) {
  9.       ObLogicalOperator *top = candidates_.candidate_plans_.at(i).plan_tree_;
  10.       CandidatePlan &plan = candidates_.candidate_plans_.at(i);
  11.       bool can_skip_sort = false;  
  12.       if (OB_NOT_NULL(top) && log_op_def::LOG_TABLE_SCAN == top->get_type()) {  
  13.         ObLogTableScan *table_scan = static_cast<ObLogTableScan*>(top);  
  14.         if (table_scan->use_index_merge() and table_scan->get_limit_expr() != nullptr) {  
  15.           can_skip_sort = true;  
  16.           adlog::DEBUG("这里标识跳过");  
  17.         }  
  18.       }  
  19.         
  20.       if (!can_skip_sort) {  
  21.         if (OB_FAIL(create_order_by_plan(plan.plan_tree_, order_items, NULL, false))) {  
  22.           LOG_WARN("failed to create order by plan", K(ret));  
  23.         }  
  24.       }  
  25.       // if (OB_FAIL(create_order_by_plan(plan.plan_tree_, order_items, NULL, false))) {
  26.       //   LOG_WARN("failed to create order by plan", K(ret));
  27.       // } else
  28.       if (OB_FAIL(order_by_plans.push_back(plan))) {
  29.         LOG_WARN("failed to push back", K(ret));
  30.       } else if (top != candidates_.candidate_plans_.at(i).plan_tree_) {
  31.         re_allocate_happened = true;
  32.       }
  33.     }
  34.     candidates_.is_final_sort_ = false;
  35.     if (OB_SUCC(ret) && re_allocate_happened) {
  36.       int64_t check_scope = OrderingCheckScope::CHECK_SET;
  37.       if (OB_FAIL(update_plans_interesting_order_info(order_by_plans, check_scope))) {
  38.         LOG_WARN("failed to update plans interesting order info", K(ret));
  39.       } else if (OB_FAIL(prune_and_keep_best_plans(order_by_plans))) {
  40.         LOG_WARN("failed to prune and keep best plans", K(ret));
  41.       } else { /*do nothing*/ }
  42.     }
  43.   }
  44.   return ret;
  45. }
复制代码
做了以上内容后,我们现在可以把上层的sort算子去掉了,并且把limit_expr给提取了table_scan->set_limit_expr(sort_op->get_topn_expr());里面。
接下来,在ObTableScanOp::init_attach_scan_rtdef内容中,对于这个刚才说的情况进行判断:
  1.     //
  2.     if (attach_ctdef->op_type_ == DAS_OP_INDEX_MERGE) {
  3.       auto && non_const_ctdef = const_cast<ObDASBaseCtDef*>(attach_ctdef);
  4.       auto &&merge_ctdef = static_cast<ObDASIndexMergeCtDef*>(non_const_ctdef);
  5.       ObDASIndexMergeRtDef *merge_rtdef = static_cast<ObDASIndexMergeRtDef*>(attach_rtdef);  
  6.       int64_t limit = 0;  
  7.       bool is_null = false;  
  8.       
  9.       LOG_INFO("INDEX_MERGE detected", "has_limit_expr", OB_NOT_NULL(merge_ctdef->limit_expr_));
  10.       
  11.       if (OB_NOT_NULL(merge_ctdef->limit_expr_) &&   
  12.           OB_FAIL(calc_expr_int_value(*merge_ctdef->limit_expr_, limit, is_null))) {
  13.         LOG_WARN("failed to calc limit expr", K(ret));
  14.       } else if (!is_null && limit > 0) {  
  15.         merge_rtdef->topn_limit_ = limit;  
  16.         merge_rtdef->enable_topn_pushdown_ = true;
  17.         LOG_INFO("TopN pushdown ENABLED in table_scan_op", K(limit));
  18.       } else {
  19.         merge_rtdef->enable_topn_pushdown_ = false;
  20.         LOG_INFO("TopN pushdown DISABLED in table_scan_op", K(is_null), K(limit));
  21.       }
  22.     }  
复制代码
这样,就在merge_rtdef里面成功的赋值了,我们现在剩下的任务,就是在index_merge_iter.cpp里面搞一个堆出来,使得我们可以在这一层成功的过滤掉数据。
首先是在inner_init函数里面加上以下内容:(有一些自己搞的初始化的内容先暂时别管)
  1.         if (OB_SUCC(ret)) {
  2.           if (OB_FAIL(result_buffer_.init(max_size_, eval_ctx_, output_, mem_ctx_->get_malloc_allocator()))) {
  3.             LOG_WARN("failed to init merge result buffer", K(ret));
  4.           } else {
  5.             if (merge_rtdef_->enable_topn_pushdown_) {
  6.               result_buffer_.enable_topn_ = true;
  7.               result_buffer_.topn_finish_ = false;
  8.               result_buffer_.first_flag_ = true;
  9.               result_buffer_.topn_limit_ = merge_rtdef_->topn_limit_ ;
  10.             } else {
  11.               result_buffer_.enable_topn_ = false;
  12.               result_buffer_.topn_finish_ = false;
  13.               result_buffer_.first_flag_ = true;
  14.             }
  15.           }
  16.         }
复制代码
然后稍加改造:
  1. int ObDASIndexMergeIter::save_row_to_result_buffer() {
  2.   int ret = OB_SUCCESS;
  3.   if (!result_buffer_.enable_topn_) {
  4.     // LOG_INFO("save_row: using normal add_row");
  5.     return result_buffer_.add_row();
  6.   }
  7.   // LOG_INFO("save_row: using TopN add_row_with_topn");
  8.   return result_buffer_.add_row_with_topn();
  9. }
复制代码
这里就不详细展示ObDASIndexMergeIter::MergeResultBuffer::add_row_with_topn()的内容了,就是简单地加入行和score,放到堆里面。
只是最后输出的内容,我们用first_flag进行了一个标记,当我们是index_merge第一次走完了的时候,就把堆里面的内容放到一个数组里面。然后之后我们如果再要输出结果的话,就直接从数组里面开始输出就好了。和堆就没有什么关系了。(代码中间删去了一些检测或者析构的内容)
  1. //advise
  2. int ObDASIndexMergeIter::MergeResultBuffer::to_expr_from_topn(int64_t &size, int64_t capacity)  
  3. {
  4.   int ret = OB_SUCCESS;
  5.   common::ObArray<TopNRow *> sorted_rows;
  6.   //...
  7.   if (first_flag_) {
  8.     adlog::DEBUG("开始转移 : topn_heap_->count(): ", topn_heap_->count());
  9.     out_arr_.reserve(topn_heap_->count());
  10.     while (topn_heap_->count() > 0) {
  11.       TopNRow *top = topn_heap_->top();
  12.       adlog::DEBUG("top: score: ", top->score_);
  13.       if (OB_FAIL(out_arr_.push_back(top))) {
  14.         adlog::DEBUG("failed to push  to topn_heap_out_", K(ret));
  15.         break;
  16.       }
  17.       topn_heap_->pop();
  18.     }
  19.     first_flag_ = false;
  20.     arr_cur_idx_ = -1;
  21.     std::sort(out_arr_.begin(), out_arr_.end(),
  22.     [](const TopNRow *a, const TopNRow *b) {
  23.       return a->score_ > b->score_;
  24.     });
  25.   }
  26.   if (arr_cur_idx_ == out_arr_.count() - 1) {
  27.     size = 0;
  28.     return ret;
  29.   }
  30.   int64_t begin_idx_ = arr_cur_idx_ + 1;
  31.   int64_t end_idx_ = std::min(arr_cur_idx_ + capacity + 1, out_arr_.count());
  32.   // Convert sorted rows to expressions in batch
  33.   {
  34.     ObEvalCtx::BatchInfoScopeGuard batch_info_guard(*eval_ctx_);
  35.     batch_info_guard.set_batch_size(end_idx_ - begin_idx_);
  36.    
  37.     for (int64_t i = begin_idx_; OB_SUCC(ret) && i < end_idx_; ++i) {
  38.       batch_info_guard.set_batch_idx(i);
  39.       TopNRow *row = out_arr_.at(i);
  40.       adlog::DEBUG("输出的分数 i: ", i , " socre: ", row->score_);
  41.       if (OB_ISNULL(row) || OB_ISNULL(row->row_data_)) {
  42.         ret = OB_ERR_UNEXPECTED;
  43.         LOG_WARN("unexpected null row", K(ret), K(i));
  44.       } else if (OB_FAIL(row->row_data_->to_expr<true>(*exprs_, *eval_ctx_))) {
  45.         LOG_WARN("failed to convert row to expr", K(ret), K(i));
  46.       }
  47.     }
  48.    
  49.     if (OB_SUCC(ret)) {
  50.       size = end_idx_ - begin_idx_;
  51.     }
  52.     // Clean up all rows
  53.     // ......
  54.     arr_cur_idx_ = end_idx_ - 1;
  55.   }
  56.   
  57.   return ret;
  58. }
复制代码
这样,就基本实现了topn下推到index merge层,QPS从200会提升到300这里。这也是我们队伍在本次比赛做的所有优化了。下面提到的缓存在思路上是完全可以实现的,所以也来稍微讲讲。
缓存token_iter(dim_iter)

其实这里,我个人认为,缓存dim_iter也好,还是缓存dim_iter里面的token_iter,都是一样的。毕竟本身都是为了把结果记录下来。
其实思路很简单,就是通过火焰图分析后,发现这个获取全文索引的分数和doc_id有点太慢了,我们这里做一个缓存,把数据都记录下来应该会快很多。
其实自己的思路有点乱七八糟,最开始的时候,只是想着,当这个迭代器扫描结束了之后,我们把扫描得到的数据存到一个hashmap里面,以后再有它一样token的迭代器要扫描的话,就直接用之前获取到的数据就好了。
但是这样实现了之后,发现这个缓存竟然没怎么起到作用。原因是因为竟然很多缓存迭代器不会扫到ITER_END。这里后续大概了解了下,貌似是ob本身对于这个有topn情况下对于全文检索的一种优化,并不会把迭代器所有的内容都扫描完。所以关于这个缓存是如何存放的就成了一个问题。
我后来想的做法就是,既然迭代器无法保证全扫描过,那么我直接强制让这个迭代器在第一次扫的时候,强制他走完,把结果存到缓存里面,这样应该就可以了。
从理论上来讲,应该是没有问题的,但是结果却发现返回ERROR。再细看,是发现貌似是底层的一个compare出错了,我并不是太理解为什么,虽然dim_iter和token_iter都是对于batch去搞,每一次自己的bathc结束了就会进行底层的扫描,但是我还是觉得强制扫描完了之后,走这个缓存也不会有什么问题。
但是很不幸的是错了,而且比赛迫在眉睫(还有就是我觉得这个貌似对于内存来讲不太友好),我不得不再想一个别的办法:就是我们对于一个token_iter来说,它每次读取多少数据我们是知道的,那么我们可以来判断一下,当前这个迭代器读取的数据,缓存是否有对应的数据,如果有的话,那么就直接读取缓存就好了。如果没有,那么就说明我们缓存的内容不够本次的read。那么就让存放缓存的那个迭代器继续走batch,直到缓存存放的数据够这个迭代器用了,就停下来。相当于就是做了一个按需存放缓存,这个做法对内存也应该会更加的友好。
  1. class DDManager {  
  2.   public:  
  3.     hash::ObHashMap<ObString, TokenCacheData*> cache_map_;  
  4.     hash::ObHashMap<ObString, ObTextRetrievalDaaTTokenIter*> origin_iter_map_;
  5.     hash::ObHashMap<ObString, int64_t> cnt_map_;
  6.     common::ObArenaAllocator allocator_;
  7.     int64_t map_max_size_{150};
  8.       
  9.   public:  
  10.   DDManager() : allocator_(ObMemAttr(MTL_ID(), "TokenCacheAlloc")) {
  11.       cache_map_.create(map_max_size_, "TokenCache", "TokenCache");
  12.       origin_iter_map_.create(map_max_size_,"Iter","TokenIter");
  13.       cnt_map_.create(map_max_size_,"aaaaa","bvbbbb");
  14.     }  
  15.       
  16.     ~DDManager() {  
  17.                 // ......
  18.     }  
  19.       
  20.     static DDManager& get_instance() {  
  21.       static DDManager instance;  
  22.       return instance;  
  23.     }  
  24.       
  25.     TokenCacheData* get_cache_data(const ObString &token) {  
  26.       TokenCacheData *cached_data = nullptr;  
  27.       cache_map_.get_refactored(token, cached_data);  
  28.       return cached_data;  
  29.     }  
  30.       
  31.     int create_cache_data(const ObString &token, TokenCacheData *&cached_data) {  
  32.       int ret = OB_SUCCESS;  
  33.       void *buf = allocator_.alloc(sizeof(TokenCacheData));  
  34.       if (OB_ISNULL(buf)) {  
  35.         ret = OB_ALLOCATE_MEMORY_FAILED;  
  36.       } else {  
  37.         cached_data = new(buf) TokenCacheData();  
  38.         ret = cache_map_.set_refactored(token, cached_data);  
  39.       }  
  40.       return ret;  
  41.     }
  42.   
  43.     ObTextRetrievalDaaTTokenIter* get_origin_iter(const ObString &token) {  
  44.       ObTextRetrievalDaaTTokenIter *cached_data = nullptr;  
  45.       origin_iter_map_.get_refactored(token, cached_data);  
  46.       return cached_data;  
  47.     }  
  48.       
  49.     int push_origin_iter(const ObString &token, ObTextRetrievalDaaTTokenIter *&cached_data) {  
  50.       int ret = OB_SUCCESS;  
  51.         
  52.       ret = origin_iter_map_.set_refactored(token, cached_data);  
  53.       
  54.       return ret;
  55.     }
  56.   
  57.     int create_token_iter(const ObString &token, const ObTextRetrievalScanIterParam &orig_param) {  
  58.       int ret = OB_SUCCESS;  
  59.   
  60.       // 创建新的param,使用Manager的分配器  
  61.       ObTextRetrievalScanIterParam new_param = orig_param;  
  62.       new_param.allocator_ = &allocator_;  // 替换分配器  
  63.   
  64.       // 创建新迭代器  
  65.       void *buf = allocator_.alloc(sizeof(ObTextRetrievalDaaTTokenIter));  
  66.       if (OB_ISNULL(buf)) {  
  67.         ret = OB_ALLOCATE_MEMORY_FAILED;  
  68.       } else {  
  69.         auto && new_iter = new(buf) ObTextRetrievalDaaTTokenIter();  
  70.         if (OB_FAIL(new_iter->init(new_param))) {  
  71.           LOG_WARN("failed to init token iter", K(ret));  
  72.         } else if (OB_FAIL(origin_iter_map_.set_refactored(token, new_iter))) {  
  73.           LOG_WARN("failed to store iter", K(ret));  
  74.         }  
  75.       }  
  76.       return ret;  
  77.     }
  78.   
  79.     // ....
  80.   };
复制代码
这是当时乱写的一个版本。
然后我们在ObTextRetrievalDaaTTokenIter里加上
  1. class ObTextRetrievalDaaTTokenIter final : public ObISRDaaTDimIter
  2. {
  3.   // ADVISE 加上以下内容
  4.   ADCacheData * own_cache_data_{nullptr};  
  5.   bool use_cache_{false};  
  6.   int64_t cache_read_idx_{-1};
  7.   
  8.   ObString token_name_;
  9.   int force_build_cache();
  10.   int init_with_cache();
  11.   bool init_with_cache_flag_{false};
复制代码
然后这是当时实现的一些函数:
  1. int ObTextRetrievalTokenIter::get_next_batch_with_cache(const int64_t capacity,
  2.                                                         int64_t &count) {
  3.   int ret = OB_SUCCESS;
  4.   count = 0;
  5.   int64_t available_count =
  6.   own_cache_data_->total_count_ - (cache_read_idx_ + last_read_count_ - 1 + 1);
  7.   adlog::DEBUG("ObTextRetrievalTokenIter::get_next_batch_with_cache start");
  8.   adlog::DEBUG("available_count: ", available_count);
  9.   bool have_to_more_data_flag = false;
  10. // 这里就是判断本次阅读的数量是否够,如果不够的话,就判断一下是否还可以让对应的迭代器接着去读数据存数据
  11. // is_completed_这个就是表示迭代器是否扫描结束了、
  12.   if (available_count < capacity and (!own_cache_data_->is_completed_)) {
  13.     ret = load_more_data_to_cache(capacity - available_count);
  14.     if (ret == OB_ITER_END) {
  15.       own_cache_data_->is_completed_ = true;
  16.       adlog::DEBUG("map里的缓存迭代器走到头了");
  17.       ret = OB_SUCCESS;
  18.     }
  19.   } else{
  20.     adlog::DEBUG("起到了缓存的效果");
  21.   }
  22.   available_count = own_cache_data_->total_count_ - (cache_read_idx_ + last_read_count_ - 1 + 1);
  23.   int64_t read_count = std::min(available_count, capacity);
  24.   adlog::DEBUG("token缓存里面: cache_read_idx_: ", cache_read_idx_," read_count: ",read_count, " total: ",own_cache_data_->total_count_);
  25.   if (read_count <= 0) {
  26.     ret = OB_ITER_END;
  27.   } else {
  28.     count = read_count;
  29.     cache_read_idx_ += last_read_count_;
  30.     last_read_count_ = read_count;
  31.   }
  32.   return ret;
  33. }
  34. int ObTextRetrievalTokenIter::load_more_data_to_cache(int64_t need_data_cnt) {
  35.   // 获取缓存管理器实例
  36.   adlog::DEBUG("开始尝试让缓存追加数据");
  37.   int ret = OB_SUCCESS;
  38.   TokenCacheManager &cache_mgr = TokenCacheManager::get_instance();
  39.   // 获取存储的原始迭代器
  40.   ObTextRetrievalTokenIter *origin_iter = cache_mgr.get_origin_iter(token_name_);
  41.   if (OB_ISNULL(origin_iter)) {
  42.     ret = OB_ERR_UNEXPECTED;
  43.     LOG_WARN("failed to get origin iter from cache manager", K(ret),
  44.              K(token_name_));
  45.     adlog::DEBUG("怎么会 origin_iter 会 空指针");
  46.   } else {
  47.     // 获取或创建缓存数据
  48.     TokenCacheData *cache_data = cache_mgr.get_cache_data(token_name_);
  49.     if (OB_ISNULL(cache_data)) {
  50.       adlog::DEBUG("ERROR! 竟然没有cache_data");
  51.     }
  52.     // 从原始迭代器扫描数据并添加到缓存
  53.     int64_t loaded_count = 0;
  54.     while (OB_SUCC(ret) && loaded_count < need_data_cnt) {
  55.       int64_t batch_count = 0;
  56.       // int64_t capacity = std::min(need_data_cnt - loaded_count, origin_iter->max_batch_size_);
  57.       if (OB_FAIL(origin_iter->get_next_batch(origin_iter->max_batch_size_, batch_count))) {
  58.         if (OB_UNLIKELY(OB_ITER_END != ret)) {
  59.           LOG_WARN("failed to get next batch from origin iter", K(ret));
  60.         } else {
  61.           // ret = OB_SUCCESS;
  62.           adlog::DEBUG("map里的 缓存迭代器 到的末尾");
  63.           break; // 到达末尾,正常结束
  64.         }
  65.       } else if (batch_count > 0) {
  66.         // 提取数据并添加到缓存
  67.         if (OB_FAIL(extract_and_add_data_to_cache(*origin_iter, *cache_data, batch_count))) {
  68.           LOG_WARN("failed to extract and add data to cache", K(ret));
  69.         } else {
  70.           loaded_count += batch_count;
  71.         }
  72.       }
  73.     }
  74.     if (OB_SUCC(ret)) {
  75.       LOG_DEBUG("successfully loaded data to cache", K(loaded_count), K(token_name_));
  76.       adlog::DEBUG("应该是追加成功了 loaded_count: ", loaded_count, "  token_name_: ", token_name_);
  77.     }
  78.   }
  79.   return ret;
  80. }
复制代码
关于缓存的大概内容就是这样,按道理讲应该是对的? 但是很可惜有问题。
如果这个实现出来了,QPS应该会跑到600那里的,对应到决赛榜单就会是第19名,堪堪进去。
postscript

最终,这个比赛结束了。失败总会贯穿人生始终。来年大概也不会参加这个比赛了,两个队友要考研,我应该到时候会找实习 or work吧。
感谢我的队友,让我这次比赛可以专注于内核方面,rag赛道我是一点也不知道啊。
其实本来以为进20无望,但是实现出来了topn下推,以及rag赛道分数突然高了起来,导致最终又有了动力去搞,虽然结局并不美好...
最终还献祭了自己挂了一科阿巴阿巴。
其实这个比赛,前半个多月,我都没有什么进展,卡index merge的bug也卡了很久。一开始也打算去实现一些谓词过滤的下推,还没有实现出来。
这是第一次去阅读这么大型的代码,其实个人感觉貌似没有学到很多数据库的知识。倒是感觉到了ob数据库代码一些设计是很牛的。
其实学到更多的,就是看这些代码,去理解他是怎么跑起来的,怎么做可以让他跑得更快。
总的来说,其实还是很有意思的。说不定明年还有机会可以再参加一次呢?
接下来要考虑找实习的内容了,希望自己可以有一个不错的结果。

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册