日期:2025.12.20(凌晨) 12.25
个人总结:
有段时间没有写blog了,自从ICPC退役之后,一刻也没有为iCPC哀悼,紧接着登场的是OB比赛。
印象中西安区域赛是10月中旬那里吧,基本上打完西安区域赛就到ob的比赛了。
决赛是22号的下午6点结束,距离比赛结束还有两天的时间,很不幸的是22号我有考试,并且这门课这学期从来没有看过,所以实际的code时间大概就剩下了今天这一整天了。能不能把接下来的优化做出来,感觉已经很不好说了,由于改动到了一个很基本的头文件,导致现在要等很久编译,索性来补一篇这场比赛的记录。
ob的比赛真的要比我想象中的要难不少,其实在暑假打完数据库的比赛后听说了ob的比赛有点想参加,但是听说特别难于是放弃。
但是有个队伍差一个人,是上一次数据库比赛的前一个队伍,他们问我要不要一起打,我直接狠狠答应。于是就有了非常痛苦煎熬的道路。
update:12.25
决赛排名23名,无缘全国前20了qwq。
perface:
参赛队伍是[登山],初赛排名全国排名第十,决赛排名23名。
初赛之所以排名高,是因为我们在比赛前就规划好了先把去年2024年的题给做了,保证每个题都起码有一个人做过。由于那段时间和ICPC的时间重合,所以基本上都是白天ICPC,晚上数据库走起,初赛负责的题目并不是太多。有一个时间差,导致我们初赛满分的很快,这里感谢队长,把rag和初赛ob赛题的内容给包了,让我们初赛通过的时间很快。
但是我觉得我个人非常感觉后悔的一件事就是,在初赛满分了之后,我去搞了一个minilsm的lab,中间提了两个PR就紧接着去ob决赛了。现在想来,为什么当时打算做这个mini-lsm,是因为觉得决赛可能会用到,但是没有过这种性能优化的经验,导致走了很多弯路,最终这个lsm其实并没有对我的优化起到帮助。(虽然提了人生中前两个PR让我有点小窃喜)。
最后决赛两天一夜的时间去实现缓存,但是却没有实现出来,说实在的打击挺大的,因为我个人觉得这个缓存的实现难度并不是太高,甚至我最开始不是使用的KVCache,而是直接套了个hashmap去搞,但是最终火焰图发现在走get_next_batch_with_cache的时候几乎都会走laod_more_data,也就代表着我的缓存几乎没有什么用。在还有几分钟就要结束的时候才发现到了这个,真的觉得自己是个傻b。尽管拼命去找为什么了,最后还是惨淡收场。
ok说的有点长了,这里说一下本人负责的题目:
初赛:(除去了简单题):alias + simple_sub_query + complex_sub_query + create_view
复赛:内核赛道:index_merge优化 + 构建ID列索引 + 下推topn优化 + 缓存dim_iter or token_iter (rag 赛道是一点没有看过)
初赛
由于距离初赛都过去了快2个月的时间,我对于代码也并不是印象太深刻了,所以在这里挖一个坑吧,以后后续说不定回来填。
决赛
首先大概说一下,决赛的内容就是让我们去改动seekdb的源码,对于一条sql语句(类似是这个样子,实际的筛选条件由测试脚本决定)- SELECT docid_col, MATCH(fulltext_col) AGAINST('test') as _score
- FROM test_insert_en
- WHERE MATCH(fulltext_col) AGAINST('text')
- and base_id in ('base_id_1', 'base_id_2', 'base_id_3')
- and id < 1000
- ORDER BY _score DESC
- LIMIT 10;
复制代码 进行优化,QPS肯定越高越好。初始的QPS官方评测机跑出来是 14 QPS上下浮动。
这里先大概说明,以下四个优化大概最后能优化到QPS600+。
index_merge 优化
其实这个思路的来源是官方的决赛指导中,提出了一个二阶段index merge优化的思路,让我知道了原来还有索引合并这种东西。官方可能第一阶段没有研究的很好?因为是演示时拿的oceanbase,然后直接建立在了index merge的基础上给了我们一个优化思路,但是seekdb本身甚至跑不起来index merge。
于是我觉得,可以先尝试实现index merge,之后再考虑这个二阶段的事情。
然后翻阅了源码后发现,seekdb原生是支持or条件的index merge,并没有支持and条件的(也没有支持IN条件表达式)。所以我们的目的就是让seekdb去支持and条件下的index merge。其实这部分可以去看oceanbase的源码,但是我当时脑子其实并没有转过来,因为发现ob的源码里是对于index merge and的情况去搞了一个index_merge_and_iter,我觉得有点麻烦,索性就直接自己搞了。
这里首先说明的是,在join_order.cpp里面,做了一些关于index_merge_or的一些情况。这个文件主要是用来进行一些路径的创建。
这里有一个大概的执行顺序(我也不是特别的确定):对于我们要走的index merge方法,优化器会先生成一条index merge的路径,然后根据这个路径去创建逻辑计划树,再然后是物理执行树,然后会是一个迭代器树,主要的代码运行也基本都是耗在这个迭代器树上,会进行各种扫描。
然后我们会发现,join_order.cpp里面,有一些关于index_merge_or的内容,但是缺少了and的情况。
所以我们就是要在这里进行修改。- create_access_paths -> create_index_merge_access_paths -> get_candi_index_merge_trees -> get_valid_index_merge_indexes
- -> generate_candi_index_merge_trees
- !candi_index_trees.empty() -> check_candi_index_trees_match_hint
- -> ......
- -> get_valid_index_merge_indexes
- -> generate_candi_index_merge_trees
- -> prune_candi_index_merge_trees
- -> do_create_index_merge_paths -> choose_best_selectivity_branch
- -> root_node->formalize_index_merge_tree()
- -> build_access_path_for_scan_node
- -> create_one_index_merge_path
- -> ......
- -> access_paths.push_back(static_cast(index_merge_path)))
- -> check_index_merge_paths_contain_fts
- (!is_match_hint && !contain_fts) -> prune_index_merge_path
复制代码 这里是我当时做的时候列出来的一个函数的执行过程。
其中generate_candi_index_merge_trees 这个函数是用来构建一些候选的index_merge_tree。
这里有一个小地方就是:我们可以通过简单地打日志的办法,发现对于filter来说,如果支持了这种or的情况,那么多半filter的count()就是1,ob会对于这种情况,也就是filter搞出来一个树,这个树本身也是为了支持or条件而搞出来了的。那些or条件的内容都是filter的children。
那如果我们全是and的情况,ob并不会搞出来一个树,而就是一个个节点本身,也就是说,如果我们的谓词筛选是: where a < 1 and b > 1, 这种情况filter的count()就是2。
所以我的代码修改就是在这个函数里,加上对于and情况的支持:- // 检查所有filters是否都是可索引的简单条件
- ObSEArray<ObIndexMergeNode*, 4> valid_nodes;
- ObSEArray<bool, 4> valid_flags;
-
- for (int64_t i = 0; OB_SUCC(ret) && i < filters.count(); ++i) {
- ObRawExpr *filter = filters.at(i);
- if (OB_ISNULL(filter) ||
- filter->get_expr_type() == T_OP_OR ||
- filter->get_expr_type() == T_OP_AND) {
- continue;
- }
-
- ObIndexMergeNode *candi_node = NULL;
- bool is_valid = false;
- if (OB_FAIL(generate_candi_index_merge_node(ref_table_id,
- filter,
- valid_index_ids,
- valid_index_cols,
- candi_node,
- is_valid))) { //在这个函数里面我们也会做适当地修改
- LOG_WARN("failed to generate node for simple filter", K(ret), KPC(filter));
- } else if (is_valid && OB_NOT_NULL(candi_node)) {
- valid_nodes.push_back(candi_node); //如果发现ok了,我们就放进去。
- valid_flags.push_back(true);
- } else {
- valid_flags.push_back(false);
- }
- }
- // 如果所有条件都可索引,则构建AND树
- if (valid_flags.count() == filters.count() && valid_flags.count() > 1 && valid_nodes.count() > 1) {
- ObIndexMergeNode *and_root = NULL; //这里我就十分暴力的去创建出来一个INderMergeNode root,去支持and的情况。
- if (OB_NOT_NULL(and_root = OB_NEWx(ObIndexMergeNode, allocator_))) {
- and_root->node_type_ = INDEX_MERGE_INTERSECT;
- for (int64_t i = 0; i < valid_nodes.count(); ++i) {
- if (OB_FAIL(and_root->children_.push_back(valid_nodes.at(i)))) {
- LOG_WARN("failed to add child to AND root", K(ret));
- }
- }
- adlog::DEBUG("and_root->node_type_ ",(int64_t)and_root->node_type_);
- adlog::DEBUG("and_root->children_ count() ",and_root->children_.count());
- if (OB_FAIL(and_root->formalize_index_merge_tree())) {
- LOG_WARN("failed to formalize AND tree", K(ret));
- } else {
- if (OB_FAIL(candi_index_trees.push_back(and_root))) {
- LOG_WARN("failed to push back AND tree", K(ret));
- } else {
- and_root->gen_code_ = 114514;
- adlog::DEBUG("Added AND merge tree for ", valid_nodes.count(), " conditions");
- }
- }
- adlog::DEBUG("and_root->node_type_ ",(int64_t)and_root->node_type_);
- adlog::DEBUG("and_root->children_ count() ",and_root->children_.count());
- }
- }
复制代码 但是这里其实还有一点,就是ob本身其实时时刻刻都会有index_merge的情况。这应该是一些内部的优化吧,我并没有很深入了解。但是这种情况其实是和我们要创建index_merge_and的情况是冲突的。
这里就不得不提一下这里做法的弊端了:由于赛题本身是对混合查询做优化,所以这里index_merge的优化其实是很重要的。有一个关于计算cost的情况我并没有提及,是因为赛时时间过于紧张,个人期望是可以走index merge就会走index merge,所以cost方面我直接设定为0。这显然并不好,后续应当对这种情况进行完善。
接着说回来,对于后台时刻进行别的index merge的情况,这里对于ObIndexMergeNode,我创建了一个gen_code_,用来标记判断是后台的别的情况的index merge,还是我这边为了混合查询而做的index merge。(这里是用于进行后续的判断的)。
而对于generate_candi_index_merge_node这个函数,其实主要是修改- else {
- ObSEArray<uint64_t, 1> candicate_index_tids;
- if (!get_tables().equal(filter->get_relation_ids())) {
- is_valid_node = false;
- } else if (!(filter->has_flag(IS_SIMPLE_COND) ||
- filter->has_flag(IS_RANGE_COND) ||
- filter->has_flag(CNT_MATCH_EXPR) ||
- (filter->has_flag(IS_IN) ))) {
- is_valid_node = false;
复制代码 用来支持in的情况。
然后比较关键的还有这个:- // ADVISE
- // 在这个函数里面,是对于 (已经构建好了AND树的and_root,去构建出来路径)
- int ObJoinOrder::do_create_index_merge_paths
复制代码 其中,这里我们主要是修改了原本对于INTERSECT的情况的判断,因为之前是对于INTERSECT的情况,我们是要选择出来一个最优解的执行的,而不是全都要,去跑index merge。- int ObJoinOrder::choose_best_selectivity_branch(ObIndexMergeNode *&candi_node) {
- adlog::DEBUG("ObJoinOrder::choose_best_selectivity_branch START");
- int ret = OB_SUCCESS;
-
-
- if (OB_ISNULL(candi_node)) {
- ret = OB_ERR_UNEXPECTED;
- LOG_WARN("get unexpected null", K(ret), KPC(candi_node));
- adlog::DEBUG("choose_best_selectivity_branch:: get unexpected null");
- } else if (INDEX_MERGE_UNION == candi_node->node_type_) {
- adlog::DEBUG("choose_best_selectivity_branch:: INDEX_MERGE_UNION == candi_node->node_type_");
- for (int64_t i = 0; OB_SUCC(ret) && i < candi_node->children_.count(); ++i) {
- if (OB_FAIL(SMART_CALL(choose_best_selectivity_branch(candi_node->children_.at(i))))) {
- LOG_WARN("failed to choose best branch for child", K(ret), KPC(candi_node->children_.at(i)));
- }
- }
- } else if (INDEX_MERGE_INTERSECT == candi_node->node_type_) {
- adlog::DEBUG("choose_best_selectivity_branch:: INDEX_MERGE_INTERSECT == candi_node->node_type_");
-
- bool has_scan_children = true;
- // for (int64_t i = 0; i < candi_node->children_.count(); ++i) {
- // if (OB_NOT_NULL(candi_node->children_.at(i)) &&
- // (candi_node->children_.at(i)->node_type_ != INDEX_MERGE_SCAN and
- // candi_node->children_.at(i)->node_type_ != INDEX_MERGE_FTS_INDEX)) {
- // adlog::DEBUG("candi_node->children_.at(i)->node_type_ is error idx: ",i," type : ",(int64_t)candi_node->children_.at(i)->node_type_);
- // has_scan_children = false;
- // break;
- // }
- // }
- adlog::DEBUG("choose_best_selectivity_branch:: candi_node->children_.count() ",candi_node->children_.count());
- adlog::DEBUG("choose_best_selectivity_branch:: has_scan_children ",has_scan_children);
-
- bool go_index_merge_path_flag = true;
-
- if(candi_node->gen_code_ != 114514) go_index_merge_path_flag = false;
-
- if(go_index_merge_path_flag){
- for (int64_t i = 0; OB_SUCC(ret) && i < candi_node->children_.count();++i) {
- if
- (OB_FAIL(SMART_CALL(choose_best_selectivity_branch(candi_node->children_.at(i)))))
- {
- LOG_WARN("failed to choose best branch for child", K(ret),
- KPC(candi_node->children_.at(i)));
- }
- }
- } else {
- //......
- //......
-
-
- }
- adlog::DEBUG("ObJoinOrder::choose_best_selectivity_branch END");
-
- return ret;
- }
复制代码 这里就是对于我们之前的gen_code_去进行判断了,之所以说之前有冲突就是在这个地方。所以我们的gen_code_就派上用了,可以通过这个来区别开来,这个INTERSECT,到底是我们要跑的index merge and树,还是后台运行的别的内容。
大概是改完以上的内容后,可以跑通index_merge了,可以用explain检测一下。
但是大概率会出现Error的情况。因为关于实现index_merge_and,我们还需要修改index_merge_iter.cpp里面的内容。
之所以会出现Error的情况,是因为seekdb原生不支持index_merge_and,所以在ObDASIndexMergeIter::inner_init函数里面,对于INTERSECT的情况专门返回了ERROR。
所以我们改一下就好。- if (OB_UNLIKELY(merge_type_ != INDEX_MERGE_UNION and merge_type_ != INDEX_MERGE_INTERSECT)) {
- ret = OB_INVALID_ARGUMENT;
- LOG_WARN("invalid merge type", K(merge_type_));
- } else
- if (OB_FAIL(CURRENT_CONTEXT->CREATE_CONTEXT(mem_ctx_, context_param))) {
- LOG_WARN("failed to create index merge memctx", K(ret));
- } else {
- common::ObArenaAllocator &alloc = mem_ctx_->get_arena_allocator();
- child_iters_.set_allocator(&alloc);
- //....
复制代码 这样做了之后,我们的sql语句应该是会有返回结果的,但是召回率不出意外是有问题的。
这是因为intersect_get_next_rows函数内部实现的不对。
其实这里我们参考一下union_get_next_rows的做法就大概能知道,这里少了关于result_buffer_的应用。导致我们返回的行有了数据覆盖等问题。
所以我们应该仿照union_get_next_rows,去重写一下这个intersect_get_next_rows就好了。
值得注意的是:关于这个函数的返回条件了,我最开始并没有写“当有迭代器扫描结束后就直接使得ret = ITER_END”,导致测试的时候一直超时。所以这个点一定要写。
大概的实现内容:- int ObDASIndexMergeIter::intersect_get_next_rows(int64_t &count, int64_t capacity)
- {
- int ret = OB_SUCCESS;
- adlog::DEBUG("ObDASIndexMergeIter::intersect_get_next_rows start");
- // {
- // adlog::DEBUG("INTERSECT rowkey_exprs_ count", K(rowkey_exprs_->count()));
- // for (int64_t i = 0; i < rowkey_exprs_->count(); i++) {
- // // adlog::DEBUG("rowkey_expr", i, std::to_string(*rowkey_exprs_->at(i)));
- // {
- // char buf[1024];
- // int64_t pos = 0;
- // pos = rowkey_exprs_->at(i)->to_string(buf, sizeof(buf));
- // adlog::DEBUG("rowkey_expr", i, std::string(buf, pos));
- // }
- // }
- // }
-
- count = 0;
- result_buffer_.reuse();
-
- while (OB_SUCC(ret) && count < capacity) {
- /* try to fill each child store */
- int64_t output_idx = OB_INVALID_INDEX;
- int cmp_ret = 0;
- int64_t child_rows_cnt = 0;
-
- for (int64_t i = 0; OB_SUCC(ret) && i < child_stores_.count(); i++) {
- IndexMergeRowStore &child_store = child_stores_.at(i);
- child_rows_cnt = 0;
- if (!child_store.have_data()) {
- if (!child_store.iter_end_) {
- ObDASIter *child_iter = child_iters_.at(i);
- if (OB_ISNULL(child_iter)) {
- ret = OB_ERR_UNEXPECTED;
- adlog::DEBUG("unexpected nullptr", K(i));
- } else {
- ret = child_iter->get_next_rows(child_rows_cnt, capacity);
- if (OB_FAIL(ret)) {
- if (OB_ITER_END != ret) {
- adlog::DEBUG("WARNING child_iter->get_next_rows(child_rows_cnt, capacity);");
- }
- }
- adlog::DEBUG("child_iter: ",K(i)," ", K(child_rows_cnt));
- if (OB_ITER_END == ret && child_rows_cnt > 0) {
- ret = OB_SUCCESS;
- }
- if (OB_SUCC(ret)) {
- if (OB_FAIL(child_store.save(true, child_rows_cnt))) {
- adlog::DEBUG("WARNING failed to save child rows", K(child_rows_cnt), K(ret));
- } else if (OB_FAIL(compare(i, output_idx, cmp_ret))) {
- adlog::DEBUG("WARNING index merge failed to compare row", K(i), K(output_idx), K(ret));
- } else if (child_iter->get_type() == DAS_ITER_SORT) {
- adlog::DEBUG("i: ",i,"child_iter->get_type() == DAS_ITER_SORT" );
- reset_datum_ptr(child_iter->get_output(), child_rows_cnt);
- }
- } else if (OB_ITER_END == ret) {
- child_store.iter_end_ = true;
- ret = OB_SUCCESS;
- } else {
- adlog::DEBUG("WARNING failed to get next rows from child iter", K(ret));
- }
- }
- }
- } else if (OB_FAIL(compare(i, output_idx, cmp_ret))) {
- adlog::DEBUG("WARNING index merge failed to compare row", K(i), K(output_idx), K(ret));
- }
- }
- if (OB_FAIL(ret)) {
- adlog::DEBUG("WARNING OB_FAIL(RET)");
- } else if (output_idx == OB_INVALID_INDEX) {
- ret = OB_ITER_END;
- adlog::DEBUG("ret = OB_ITER_END");
- } else {
- bool all_matched = true;
- for (int64_t i = 0; OB_SUCC(ret) && i < child_stores_.count(); i++) {
- if (output_idx == i) {
- continue;
- }
- if ((!child_stores_.at(i).have_data())) {
- all_matched = false; // 这个child没有数据,不匹配
- ret = OB_ITER_END;
- } else if (OB_FAIL(compare(i, output_idx, cmp_ret))) {
- adlog::DEBUG("index merge failed to compare row", K(i), K(output_idx), K(ret));
- } else if (cmp_ret == 0) {
-
- } else {
- all_matched = false; // rowkey不匹配
- }
- }
- if (OB_FAIL(ret)) {
- adlog::DEBUG("OB_FAIL(ret)");
- continue;
- }
- if (!all_matched) {
- child_stores_.at(output_idx).cur_idx_++;
- continue;
- }
- for (int64_t i = 0; OB_SUCC(ret) && i < child_stores_.count(); i++) {
- if (output_idx == i) {
- continue;
- }
- child_stores_.at(i).to_expr();
- }
- child_stores_.at(output_idx).to_expr();
- if (OB_FAIL(save_row_to_result_buffer())) {
- adlog::DEBUG("failed to save row to result buffer", K(ret));
- } else {
- count += 1;
- }
- }
- }
- if(OB_FAIL(ret)){
- if (ret == OB_ITER_END) {
- adlog::DEBUG("intersect_get_next_rows : OB_FAIL(ret) ret == OB_ITER_END");
- } else {
- adlog::DEBUG("intersect_get_next_rows : OB_FAIL(ret)");
- }
- }
- adlog::DEBUG("intersect_get_next_rows :",K(count));
-
- if (OB_ITER_END == ret && count > 0) {
- ret = OB_SUCCESS;
- }
- if (OB_SUCC(ret) && count > 0) {
- if (OB_FAIL(result_buffer_.to_expr(count))) {
- adlog::DEBUG("failed to convert result buffer to exprs", K(ret));
- LOG_WARN("failed to convert result buffer to exprs", K(ret));
- }
- }
- return ret;
- }
复制代码 这里的排查内容意外的很辛苦,因为我一度怀疑过很多地方,例如什么rowkey不匹配,FTS返回的结果有问题,没有SORT等,最后发现都没有问题。只能说其实怀疑的这些地方,通过检测union的正确性,就可以不用浪费那么多时间,可能这也是为后面没有时间写出来缓存埋下伏笔了吧。
实现这个优化,QPS会从14到150那里。
构建ID列索引
这个其实反倒是没有注意到,因为我发现我的测试脚本最开始很有问题,官方后来修复了之后我并没有重新拉取,而且导致我的本地出现了一些莫名其妙的问题。例如,我这边本地测试的时候,id列其实是有索引的。但是我后来才知道原来并没有索引,这个在官方最后一周给出来了,通过这个优化,可以从150QPS到200QPS。
这里是队友写的了,我就直接贴代码了:
主要是在ObCreateTableResolver::resolve_table_elements中- if (OB_SUCC(ret)) {
- // MySQL 模式下,AUTO_INCREMENT 列必须有索引。如果用户未手动为自增列建索引,
- // 自动为该列创建一个普通(非唯一)索引,避免违反引擎约束。
- if (lib::is_mysql_mode() && 0 != autoinc_column_id && !table_schema.is_external_table()) {
- ObCreateTableStmt *create_table_stmt = static_cast<ObCreateTableStmt*>(stmt_);
- ObColumnSchemaV2 *autoinc_col = NULL;
- if (OB_ISNULL(create_table_stmt)) {
- ret = OB_ERR_UNEXPECTED;
- SQL_RESV_LOG(WARN, "unexpected null create_table_stmt when auto add index for autoinc", K(ret));
- } else if (OB_ISNULL(autoinc_col =
- create_table_stmt->get_create_table_arg().schema_.get_column_schema(
- autoinc_column_id))) {
- ret = OB_ERR_UNEXPECTED;
- SQL_RESV_LOG(WARN, "failed to get autoinc column schema when auto add index",
- K(ret), K(autoinc_column_id));
- } else if (autoinc_col->is_rowkey_column()) {
- // 已经在主键/rowkey 上,无需额外索引
- } else {
- // 构造一个仅包含自增列的普通本地索引
- HEAP_VARS_2((ObCreateIndexStmt, create_index_stmt), (ObPartitionResolveResult, resolve_result)) {
- reset();
- index_attributes_set_ = OB_DEFAULT_INDEX_ATTRIBUTES_SET;
- index_arg_.reset();
- sort_column_array_.reset();
- store_column_names_.reset();
- hidden_store_column_names_.reset();
- index_keyname_ = NORMAL_KEY;
- index_scope_ = NOT_SPECIFIED; // MySQL 默认本地索引
- name_generated_type_ = GENERATED_TYPE_SYSTEM;
- has_index_using_type_ = false;
- ObColumnSortItem sort_item;
- sort_item.column_name_ = autoinc_col->get_column_name_str();
- sort_item.prefix_len_ = -1;
- sort_item.order_type_ = common::ObOrderType::ASC;
- ObString uk_name;
- if (OB_FAIL(resolve_index_name(nullptr, sort_item.column_name_, false, uk_name))) {
- SQL_RESV_LOG(WARN, "resolve auto index name failed", K(ret));
- } else if (OB_FAIL(add_sort_column(sort_item))) {
- SQL_RESV_LOG(WARN, "add auto index column failed", K(ret), K(sort_item));
- } else if (OB_FAIL(generate_index_arg(false /*process_heap_table_primary_key*/))) {
- SQL_RESV_LOG(WARN, "generate auto index arg failed", K(ret));
- } else {
- ObCreateIndexArg &create_index_arg = create_index_stmt.get_create_index_arg();
- ObSArray<ObPartitionResolveResult> &resolve_results =
- create_table_stmt->get_index_partition_resolve_results();
- ObSArray<obrpc::ObCreateIndexArg> &index_arg_list =
- create_table_stmt->get_index_arg_list();
- index_arg_.index_key_ = static_cast<int64_t>(index_keyname_);
- if (OB_FAIL(create_index_arg.assign(index_arg_))) {
- LOG_WARN("fail to assign auto index arg", K(ret));
- } else if (OB_FAIL(resolve_results.push_back(resolve_result))) {
- LOG_WARN("fail to push back auto index resolve result", K(ret));
- } else if (OB_FAIL(index_arg_list.push_back(create_index_arg))) {
- LOG_WARN("fail to push back auto index arg", K(ret));
- }
- }
- }
- }
- }
- }
复制代码 下推topn
其实本来是打算去实现官方说的二阶段优化的,但是我写到中间,发现太难实现了。需要改掉很多内容,路径,逻辑计划执行计划迭代器树等内容都需要更改。所以直接放弃了。
但是有个我们可以发现一个内容,就是去用explain,我们可以发现,主要是有两层,第一层是一个sort算子,里面内置了limit,去限制个数。第二层是我们目前已经改过了的index merge。
会发现其实有一个很耗时的内容,就是我们第二层扫描的结果很多,但是都需要返回到第一层再进行过滤,这个太慢了,我们完全可以在第二层的时候就把他过滤掉,毕竟我们是可以获取到这些数据的。耗时的一个关键点还有是因为我们在index merge的时候,那个result_buffer是进行拷贝操作的,所以也会有耗时内容。
那么我们要做什么呢?
首先我们是需要把limit给提取出来。但是注意,提取的并不是limit的value,而是limit_expr。
我一开始的时候没有考虑明白,把limit_value给记录了下来,放到了index_merge_ctdef里面,但是这样显然是不对的。毕竟这个ctdef是在构建执行计划的时候才会采用,但是我们之后再执行这样的语句的时候,其实并不会构建ctdef,而是会走缓存路线,只会构建出来rtdef。所以我们要做的,应该是把limit_expr给存到ctdef里面,然后每一次重新生成rtdef的时候,通过limit_expr把值算出来,再存到rtdef里面。
关于limit_expr的提取,我是在ObLogPlan::candi_allocate_order_by做的。
我的目的主要是,希望可以检测到,如果我们是这个sort+limit的形式,并且下层是index_merge的时候,把sort算子去掉,提取到它的limit_expr,然后放到我们的下层去。- } else {
- for (int64_t i = 0; OB_SUCC(ret) && i < candidates_.candidate_plans_.count(); i++) {
- bool is_reliable = false;
- CandidatePlan candidate_plan = candidates_.candidate_plans_.at(i);
- OPT_TRACE("generate order by for plan:", candidate_plan);
- if (OB_FAIL(create_order_by_plan(candidate_plan.plan_tree_,
- order_items,
- topn_expr,
- is_fetch_with_ties))) {
- LOG_WARN("failed to create order by plan", K(ret));
- } else if (NULL != topn_expr && OB_FAIL(is_plan_reliable(candidate_plan.plan_tree_,
- is_reliable))) {
- LOG_WARN("failed to check if plan is reliable", K(ret));
- } else if (is_reliable) {
- ret = limit_plans.push_back(candidate_plan);
- } else {
- // 在这里试试看能不能把sort算子给去掉 advise
- adlog::DEBUG("在这里试试看能不能把sort算子给去掉");
- do {
- int ret = OB_SUCCESS;
- ObLogicalOperator *&top = candidate_plan.plan_tree_;
- if (OB_ISNULL(top) || log_op_def::LOG_SORT != top->get_type()) {
- // 不是sort算子,无需处理
- adlog::DEBUG("不是sort算子啊");
- } else {
- ObLogSort *sort_op = static_cast<ObLogSort*>(top);
- ObLogicalOperator *child = sort_op->get_child(0);
- if (OB_NOT_NULL(child) && log_op_def::LOG_TABLE_SCAN == child->get_type()) {
- ObLogTableScan *table_scan = static_cast<ObLogTableScan *>(child);
- if (table_scan->use_index_merge()) {
- // 使用的index merge
- adlog::DEBUG("下层用的是index merge");
- // get_limit_expr
- // auto && limit_expr = table_scan->get_limit_expr();
- if (sort_op->get_topn_expr() != nullptr) {
- // (limit_expr) = (sort_op->get_topn_expr());
- table_scan->set_limit_expr(sort_op->get_topn_expr());
- // 保留 Sort 算子用于精排(两阶段检索)
- sort_op->set_topn_expr(nullptr);
- adlog::DEBUG("把sort里面的limit_expr放到了当前的table_"
- "scan的limit_count_expr里了");
- table_scan->set_parent(nullptr);
- }
- // 保留 Sort 算子,不跳过
- top = child;
- }
- else {
- adlog::DEBUG("但是下层使用的不是index merge");
- }
- } else {
- adlog::DEBUG("下层不是table scan啊");
- }
- }
-
- }while(0);
- ret = order_by_plans.push_back(candidate_plan);
- }
复制代码 另外,当时其实并没有分析的很明白。
在ObSelectLogPlan::allocate_plan_top里也做了一遍。- // allocate root exchange
- if (OB_SUCC(ret) && is_final_root_plan()) {
- // allocate material if there is for update without skip locked.
- // FOR UPDATE SKIP LOCKED does not need SQL-level retry, hence we don't need a MATERIAL to
- // block the output.
- if (optimizer_context_.has_no_skip_for_update()
- && OB_FAIL(candi_allocate_for_update_material())) {
- LOG_WARN("failed to allocate material", K(ret));
- //allocate temp-table transformation if needed.
- } else if (!get_optimizer_context().get_temp_table_infos().empty() &&
- OB_FAIL(candi_allocate_temp_table_transformation())) {
- LOG_WARN("failed to allocate transformation operator", K(ret));
- } else if (OB_FAIL(candi_allocate_root_exchange())) {
- LOG_WARN("failed to allocate root exchange", K(ret));
- } else {
- LOG_TRACE("succeed to allocate root exchange", K(candidates_.candidate_plans_.count()));
- }
- }
- // 在这里看能不能消除掉sort
- if (OB_SUCC(ret)) {
- for (int64_t i = 0; OB_SUCC(ret) && i < candidates_.candidate_plans_.count(); i++) {
- ObLogicalOperator *&top = candidates_.candidate_plans_.at(i).plan_tree_;
- if (OB_ISNULL(top) || log_op_def::LOG_SORT != top->get_type()) {
- continue;
- }
- ObLogicalOperator *child = top->get_child(0);
- if (OB_ISNULL(child) or
- log_op_def::LOG_TABLE_SCAN != child->get_type()) {
- adlog::DEBUG("下层不是table scan啊");
- continue;
- }
- ObLogTableScan *table_scan = static_cast<ObLogTableScan *>(child);
- if (!table_scan->use_index_merge()) {
- adlog::DEBUG("下层用的不是index merge 啊");
- continue;
- }
- adlog::DEBUG("下层用的是index merge");
- ObLogSort *sort_op = static_cast<ObLogSort *>(top);
-
- if (sort_op->get_topn_expr() != nullptr) {
- table_scan->set_limit_expr(sort_op->get_topn_expr());
- // 保留 Sort 算子用于精排(两阶段检索)
- sort_op->set_topn_expr(nullptr);
- adlog::DEBUG("把sort里面的limit_expr放到了当前的table_"
- "scan的limit_count_expr里了");
- table_scan->set_parent(nullptr);
- }
- // 保留 Sort 算子,不跳过
- top = child;
-
- }
- }
- }
- return ret;
- }
复制代码 但是我们做了以上内容后,explain发现,竟然上层的sort算子还是没有取消掉,明明明这里确实是提取出来了limit_expr。这是为什么呢?
是因为ObSelectLogPlan::candi_allocate_order_by_if_losted这个函数会进行一个补充,把丢失的order给弄回来。
所以这里我们也需要再特判一下:- int ObSelectLogPlan::candi_allocate_order_by_if_losted(ObIArray<OrderItem> &order_items)
- {
- int ret = OB_SUCCESS;
- bool re_allocate_happened = false;
- ObSEArray<CandidatePlan, 8> order_by_plans;
- if (!order_items.empty()) {
- candidates_.is_final_sort_ = true;
- for (int64_t i = 0; OB_SUCC(ret) && i < candidates_.candidate_plans_.count(); i++) {
- ObLogicalOperator *top = candidates_.candidate_plans_.at(i).plan_tree_;
- CandidatePlan &plan = candidates_.candidate_plans_.at(i);
- bool can_skip_sort = false;
- if (OB_NOT_NULL(top) && log_op_def::LOG_TABLE_SCAN == top->get_type()) {
- ObLogTableScan *table_scan = static_cast<ObLogTableScan*>(top);
- if (table_scan->use_index_merge() and table_scan->get_limit_expr() != nullptr) {
- can_skip_sort = true;
- adlog::DEBUG("这里标识跳过");
- }
- }
-
- if (!can_skip_sort) {
- if (OB_FAIL(create_order_by_plan(plan.plan_tree_, order_items, NULL, false))) {
- LOG_WARN("failed to create order by plan", K(ret));
- }
- }
- // if (OB_FAIL(create_order_by_plan(plan.plan_tree_, order_items, NULL, false))) {
- // LOG_WARN("failed to create order by plan", K(ret));
- // } else
- if (OB_FAIL(order_by_plans.push_back(plan))) {
- LOG_WARN("failed to push back", K(ret));
- } else if (top != candidates_.candidate_plans_.at(i).plan_tree_) {
- re_allocate_happened = true;
- }
- }
- candidates_.is_final_sort_ = false;
- if (OB_SUCC(ret) && re_allocate_happened) {
- int64_t check_scope = OrderingCheckScope::CHECK_SET;
- if (OB_FAIL(update_plans_interesting_order_info(order_by_plans, check_scope))) {
- LOG_WARN("failed to update plans interesting order info", K(ret));
- } else if (OB_FAIL(prune_and_keep_best_plans(order_by_plans))) {
- LOG_WARN("failed to prune and keep best plans", K(ret));
- } else { /*do nothing*/ }
- }
- }
- return ret;
- }
复制代码 做了以上内容后,我们现在可以把上层的sort算子去掉了,并且把limit_expr给提取了table_scan->set_limit_expr(sort_op->get_topn_expr());里面。
接下来,在ObTableScanOp::init_attach_scan_rtdef内容中,对于这个刚才说的情况进行判断:- //
- if (attach_ctdef->op_type_ == DAS_OP_INDEX_MERGE) {
- auto && non_const_ctdef = const_cast<ObDASBaseCtDef*>(attach_ctdef);
- auto &&merge_ctdef = static_cast<ObDASIndexMergeCtDef*>(non_const_ctdef);
- ObDASIndexMergeRtDef *merge_rtdef = static_cast<ObDASIndexMergeRtDef*>(attach_rtdef);
- int64_t limit = 0;
- bool is_null = false;
-
- LOG_INFO("INDEX_MERGE detected", "has_limit_expr", OB_NOT_NULL(merge_ctdef->limit_expr_));
-
- if (OB_NOT_NULL(merge_ctdef->limit_expr_) &&
- OB_FAIL(calc_expr_int_value(*merge_ctdef->limit_expr_, limit, is_null))) {
- LOG_WARN("failed to calc limit expr", K(ret));
- } else if (!is_null && limit > 0) {
- merge_rtdef->topn_limit_ = limit;
- merge_rtdef->enable_topn_pushdown_ = true;
- LOG_INFO("TopN pushdown ENABLED in table_scan_op", K(limit));
- } else {
- merge_rtdef->enable_topn_pushdown_ = false;
- LOG_INFO("TopN pushdown DISABLED in table_scan_op", K(is_null), K(limit));
- }
- }
复制代码 这样,就在merge_rtdef里面成功的赋值了,我们现在剩下的任务,就是在index_merge_iter.cpp里面搞一个堆出来,使得我们可以在这一层成功的过滤掉数据。
首先是在inner_init函数里面加上以下内容:(有一些自己搞的初始化的内容先暂时别管)- if (OB_SUCC(ret)) {
- if (OB_FAIL(result_buffer_.init(max_size_, eval_ctx_, output_, mem_ctx_->get_malloc_allocator()))) {
- LOG_WARN("failed to init merge result buffer", K(ret));
- } else {
- if (merge_rtdef_->enable_topn_pushdown_) {
- result_buffer_.enable_topn_ = true;
- result_buffer_.topn_finish_ = false;
- result_buffer_.first_flag_ = true;
- result_buffer_.topn_limit_ = merge_rtdef_->topn_limit_ ;
- } else {
- result_buffer_.enable_topn_ = false;
- result_buffer_.topn_finish_ = false;
- result_buffer_.first_flag_ = true;
- }
- }
- }
复制代码 然后稍加改造:- int ObDASIndexMergeIter::save_row_to_result_buffer() {
- int ret = OB_SUCCESS;
- if (!result_buffer_.enable_topn_) {
- // LOG_INFO("save_row: using normal add_row");
- return result_buffer_.add_row();
- }
- // LOG_INFO("save_row: using TopN add_row_with_topn");
- return result_buffer_.add_row_with_topn();
- }
复制代码 这里就不详细展示ObDASIndexMergeIter::MergeResultBuffer::add_row_with_topn()的内容了,就是简单地加入行和score,放到堆里面。
只是最后输出的内容,我们用first_flag进行了一个标记,当我们是index_merge第一次走完了的时候,就把堆里面的内容放到一个数组里面。然后之后我们如果再要输出结果的话,就直接从数组里面开始输出就好了。和堆就没有什么关系了。(代码中间删去了一些检测或者析构的内容)- //advise
- int ObDASIndexMergeIter::MergeResultBuffer::to_expr_from_topn(int64_t &size, int64_t capacity)
- {
- int ret = OB_SUCCESS;
- common::ObArray<TopNRow *> sorted_rows;
- //...
- if (first_flag_) {
- adlog::DEBUG("开始转移 : topn_heap_->count(): ", topn_heap_->count());
- out_arr_.reserve(topn_heap_->count());
- while (topn_heap_->count() > 0) {
- TopNRow *top = topn_heap_->top();
- adlog::DEBUG("top: score: ", top->score_);
- if (OB_FAIL(out_arr_.push_back(top))) {
- adlog::DEBUG("failed to push to topn_heap_out_", K(ret));
- break;
- }
- topn_heap_->pop();
- }
- first_flag_ = false;
- arr_cur_idx_ = -1;
- std::sort(out_arr_.begin(), out_arr_.end(),
- [](const TopNRow *a, const TopNRow *b) {
- return a->score_ > b->score_;
- });
- }
- if (arr_cur_idx_ == out_arr_.count() - 1) {
- size = 0;
- return ret;
- }
- int64_t begin_idx_ = arr_cur_idx_ + 1;
- int64_t end_idx_ = std::min(arr_cur_idx_ + capacity + 1, out_arr_.count());
- // Convert sorted rows to expressions in batch
- {
- ObEvalCtx::BatchInfoScopeGuard batch_info_guard(*eval_ctx_);
- batch_info_guard.set_batch_size(end_idx_ - begin_idx_);
-
- for (int64_t i = begin_idx_; OB_SUCC(ret) && i < end_idx_; ++i) {
- batch_info_guard.set_batch_idx(i);
- TopNRow *row = out_arr_.at(i);
- adlog::DEBUG("输出的分数 i: ", i , " socre: ", row->score_);
- if (OB_ISNULL(row) || OB_ISNULL(row->row_data_)) {
- ret = OB_ERR_UNEXPECTED;
- LOG_WARN("unexpected null row", K(ret), K(i));
- } else if (OB_FAIL(row->row_data_->to_expr<true>(*exprs_, *eval_ctx_))) {
- LOG_WARN("failed to convert row to expr", K(ret), K(i));
- }
- }
-
- if (OB_SUCC(ret)) {
- size = end_idx_ - begin_idx_;
- }
- // Clean up all rows
- // ......
- arr_cur_idx_ = end_idx_ - 1;
- }
-
- return ret;
- }
复制代码 这样,就基本实现了topn下推到index merge层,QPS从200会提升到300这里。这也是我们队伍在本次比赛做的所有优化了。下面提到的缓存在思路上是完全可以实现的,所以也来稍微讲讲。
缓存token_iter(dim_iter)
其实这里,我个人认为,缓存dim_iter也好,还是缓存dim_iter里面的token_iter,都是一样的。毕竟本身都是为了把结果记录下来。
其实思路很简单,就是通过火焰图分析后,发现这个获取全文索引的分数和doc_id有点太慢了,我们这里做一个缓存,把数据都记录下来应该会快很多。
其实自己的思路有点乱七八糟,最开始的时候,只是想着,当这个迭代器扫描结束了之后,我们把扫描得到的数据存到一个hashmap里面,以后再有它一样token的迭代器要扫描的话,就直接用之前获取到的数据就好了。
但是这样实现了之后,发现这个缓存竟然没怎么起到作用。原因是因为竟然很多缓存迭代器不会扫到ITER_END。这里后续大概了解了下,貌似是ob本身对于这个有topn情况下对于全文检索的一种优化,并不会把迭代器所有的内容都扫描完。所以关于这个缓存是如何存放的就成了一个问题。
我后来想的做法就是,既然迭代器无法保证全扫描过,那么我直接强制让这个迭代器在第一次扫的时候,强制他走完,把结果存到缓存里面,这样应该就可以了。
从理论上来讲,应该是没有问题的,但是结果却发现返回ERROR。再细看,是发现貌似是底层的一个compare出错了,我并不是太理解为什么,虽然dim_iter和token_iter都是对于batch去搞,每一次自己的bathc结束了就会进行底层的扫描,但是我还是觉得强制扫描完了之后,走这个缓存也不会有什么问题。
但是很不幸的是错了,而且比赛迫在眉睫(还有就是我觉得这个貌似对于内存来讲不太友好),我不得不再想一个别的办法:就是我们对于一个token_iter来说,它每次读取多少数据我们是知道的,那么我们可以来判断一下,当前这个迭代器读取的数据,缓存是否有对应的数据,如果有的话,那么就直接读取缓存就好了。如果没有,那么就说明我们缓存的内容不够本次的read。那么就让存放缓存的那个迭代器继续走batch,直到缓存存放的数据够这个迭代器用了,就停下来。相当于就是做了一个按需存放缓存,这个做法对内存也应该会更加的友好。- class DDManager {
- public:
- hash::ObHashMap<ObString, TokenCacheData*> cache_map_;
- hash::ObHashMap<ObString, ObTextRetrievalDaaTTokenIter*> origin_iter_map_;
- hash::ObHashMap<ObString, int64_t> cnt_map_;
- common::ObArenaAllocator allocator_;
- int64_t map_max_size_{150};
-
- public:
- DDManager() : allocator_(ObMemAttr(MTL_ID(), "TokenCacheAlloc")) {
- cache_map_.create(map_max_size_, "TokenCache", "TokenCache");
- origin_iter_map_.create(map_max_size_,"Iter","TokenIter");
- cnt_map_.create(map_max_size_,"aaaaa","bvbbbb");
- }
-
- ~DDManager() {
- // ......
- }
-
- static DDManager& get_instance() {
- static DDManager instance;
- return instance;
- }
-
- TokenCacheData* get_cache_data(const ObString &token) {
- TokenCacheData *cached_data = nullptr;
- cache_map_.get_refactored(token, cached_data);
- return cached_data;
- }
-
- int create_cache_data(const ObString &token, TokenCacheData *&cached_data) {
- int ret = OB_SUCCESS;
- void *buf = allocator_.alloc(sizeof(TokenCacheData));
- if (OB_ISNULL(buf)) {
- ret = OB_ALLOCATE_MEMORY_FAILED;
- } else {
- cached_data = new(buf) TokenCacheData();
- ret = cache_map_.set_refactored(token, cached_data);
- }
- return ret;
- }
-
- ObTextRetrievalDaaTTokenIter* get_origin_iter(const ObString &token) {
- ObTextRetrievalDaaTTokenIter *cached_data = nullptr;
- origin_iter_map_.get_refactored(token, cached_data);
- return cached_data;
- }
-
- int push_origin_iter(const ObString &token, ObTextRetrievalDaaTTokenIter *&cached_data) {
- int ret = OB_SUCCESS;
-
- ret = origin_iter_map_.set_refactored(token, cached_data);
-
- return ret;
- }
-
- int create_token_iter(const ObString &token, const ObTextRetrievalScanIterParam &orig_param) {
- int ret = OB_SUCCESS;
-
- // 创建新的param,使用Manager的分配器
- ObTextRetrievalScanIterParam new_param = orig_param;
- new_param.allocator_ = &allocator_; // 替换分配器
-
- // 创建新迭代器
- void *buf = allocator_.alloc(sizeof(ObTextRetrievalDaaTTokenIter));
- if (OB_ISNULL(buf)) {
- ret = OB_ALLOCATE_MEMORY_FAILED;
- } else {
- auto && new_iter = new(buf) ObTextRetrievalDaaTTokenIter();
- if (OB_FAIL(new_iter->init(new_param))) {
- LOG_WARN("failed to init token iter", K(ret));
- } else if (OB_FAIL(origin_iter_map_.set_refactored(token, new_iter))) {
- LOG_WARN("failed to store iter", K(ret));
- }
- }
- return ret;
- }
-
- // ....
- };
复制代码 这是当时乱写的一个版本。
然后我们在ObTextRetrievalDaaTTokenIter里加上- class ObTextRetrievalDaaTTokenIter final : public ObISRDaaTDimIter
- {
- // ADVISE 加上以下内容
- ADCacheData * own_cache_data_{nullptr};
- bool use_cache_{false};
- int64_t cache_read_idx_{-1};
-
- ObString token_name_;
- int force_build_cache();
- int init_with_cache();
- bool init_with_cache_flag_{false};
复制代码 然后这是当时实现的一些函数:- int ObTextRetrievalTokenIter::get_next_batch_with_cache(const int64_t capacity,
- int64_t &count) {
- int ret = OB_SUCCESS;
- count = 0;
- int64_t available_count =
- own_cache_data_->total_count_ - (cache_read_idx_ + last_read_count_ - 1 + 1);
- adlog::DEBUG("ObTextRetrievalTokenIter::get_next_batch_with_cache start");
- adlog::DEBUG("available_count: ", available_count);
- bool have_to_more_data_flag = false;
- // 这里就是判断本次阅读的数量是否够,如果不够的话,就判断一下是否还可以让对应的迭代器接着去读数据存数据
- // is_completed_这个就是表示迭代器是否扫描结束了、
- if (available_count < capacity and (!own_cache_data_->is_completed_)) {
- ret = load_more_data_to_cache(capacity - available_count);
- if (ret == OB_ITER_END) {
- own_cache_data_->is_completed_ = true;
- adlog::DEBUG("map里的缓存迭代器走到头了");
- ret = OB_SUCCESS;
- }
- } else{
- adlog::DEBUG("起到了缓存的效果");
- }
- available_count = own_cache_data_->total_count_ - (cache_read_idx_ + last_read_count_ - 1 + 1);
- int64_t read_count = std::min(available_count, capacity);
- adlog::DEBUG("token缓存里面: cache_read_idx_: ", cache_read_idx_," read_count: ",read_count, " total: ",own_cache_data_->total_count_);
- if (read_count <= 0) {
- ret = OB_ITER_END;
- } else {
- count = read_count;
- cache_read_idx_ += last_read_count_;
- last_read_count_ = read_count;
- }
- return ret;
- }
- int ObTextRetrievalTokenIter::load_more_data_to_cache(int64_t need_data_cnt) {
- // 获取缓存管理器实例
- adlog::DEBUG("开始尝试让缓存追加数据");
- int ret = OB_SUCCESS;
- TokenCacheManager &cache_mgr = TokenCacheManager::get_instance();
- // 获取存储的原始迭代器
- ObTextRetrievalTokenIter *origin_iter = cache_mgr.get_origin_iter(token_name_);
- if (OB_ISNULL(origin_iter)) {
- ret = OB_ERR_UNEXPECTED;
- LOG_WARN("failed to get origin iter from cache manager", K(ret),
- K(token_name_));
- adlog::DEBUG("怎么会 origin_iter 会 空指针");
- } else {
- // 获取或创建缓存数据
- TokenCacheData *cache_data = cache_mgr.get_cache_data(token_name_);
- if (OB_ISNULL(cache_data)) {
- adlog::DEBUG("ERROR! 竟然没有cache_data");
- }
- // 从原始迭代器扫描数据并添加到缓存
- int64_t loaded_count = 0;
- while (OB_SUCC(ret) && loaded_count < need_data_cnt) {
- int64_t batch_count = 0;
- // int64_t capacity = std::min(need_data_cnt - loaded_count, origin_iter->max_batch_size_);
- if (OB_FAIL(origin_iter->get_next_batch(origin_iter->max_batch_size_, batch_count))) {
- if (OB_UNLIKELY(OB_ITER_END != ret)) {
- LOG_WARN("failed to get next batch from origin iter", K(ret));
- } else {
- // ret = OB_SUCCESS;
- adlog::DEBUG("map里的 缓存迭代器 到的末尾");
- break; // 到达末尾,正常结束
- }
- } else if (batch_count > 0) {
- // 提取数据并添加到缓存
- if (OB_FAIL(extract_and_add_data_to_cache(*origin_iter, *cache_data, batch_count))) {
- LOG_WARN("failed to extract and add data to cache", K(ret));
- } else {
- loaded_count += batch_count;
- }
- }
- }
- if (OB_SUCC(ret)) {
- LOG_DEBUG("successfully loaded data to cache", K(loaded_count), K(token_name_));
- adlog::DEBUG("应该是追加成功了 loaded_count: ", loaded_count, " token_name_: ", token_name_);
- }
- }
- return ret;
- }
复制代码 关于缓存的大概内容就是这样,按道理讲应该是对的? 但是很可惜有问题。
如果这个实现出来了,QPS应该会跑到600那里的,对应到决赛榜单就会是第19名,堪堪进去。
postscript
最终,这个比赛结束了。失败总会贯穿人生始终。来年大概也不会参加这个比赛了,两个队友要考研,我应该到时候会找实习 or work吧。
感谢我的队友,让我这次比赛可以专注于内核方面,rag赛道我是一点也不知道啊。
其实本来以为进20无望,但是实现出来了topn下推,以及rag赛道分数突然高了起来,导致最终又有了动力去搞,虽然结局并不美好...
最终还献祭了自己挂了一科阿巴阿巴。
其实这个比赛,前半个多月,我都没有什么进展,卡index merge的bug也卡了很久。一开始也打算去实现一些谓词过滤的下推,还没有实现出来。
这是第一次去阅读这么大型的代码,其实个人感觉貌似没有学到很多数据库的知识。倒是感觉到了ob数据库代码一些设计是很牛的。
其实学到更多的,就是看这些代码,去理解他是怎么跑起来的,怎么做可以让他跑得更快。
总的来说,其实还是很有意思的。说不定明年还有机会可以再参加一次呢?
接下来要考虑找实习的内容了,希望自己可以有一个不错的结果。
来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作! |