Doris stream load

2023-06-13

Stream Load是Doris的一种同步的导入方式, 允许用户通过Http访问的方式批量地将CSV或者JSON数据导入Doris,并返回数据导入的结果。用户可以直接通过Http请求的返回体判断数据导入是否成功,也可以通过在客户端执行查询SQL来查询历史任务的结果。Stream Load是是最常用的一种导入方式,在小米内部占了约80%以上场景。

执行过程

用户执行stream load主要有两种方式:

  • 将请求直接提交给be,并由该节点作为本次stream load任务的coordinator。

  • 将http请求提交给fe,fe再通过http重定向将数据导入请求转发给某一个be节点,该be节点作为本次stream load任务的coordinator,此时的fe主要起到请求转发的作用。

本文中主要介绍第二种方式。其主要执行流程如下:

  • 用户提交stream load请求到fe

  • fe对http请求进行解析,然后进行鉴权。鉴权通过后,根据策略选取一台be作为coordinator,并将stream load请求转发给coordinator be(StreamLoadAction::on_header

  • coordinator be在接到请求后,会对其header信息进行校验,包括body长度、format类型等。

  • coordinator be向fe发送begin transaction的请求,fe在接收到该请求时会开启一个事务,并向coordinator be返回事务id

  • coordinator be向fe发送TStreamLoadPutRequest请求,fe在接收到该请求时,会产生导入执行计划,并向coordinator be返回。该执行计划非常简单,由OlapTableSinkBrokerLoadScanNode两个算子组成,且只有一个PlanFragment

  • coordinator be在接收到导入计划之后,开始执行导入计划。OlapTableSink会根据数据选取对应的tablet,并将其放入该tablet所在be对应的channel,后台会有线程定期的将channel中的数据通过brpc发送到对应be(根据配置olap_table_sink_send_interval_ms)。其他be在接收到该PTabletWriterAddBatchRequest请求后,会执行数据写入操作

  • 在导入完之后,会根据导入执行状态,决定是commit或者rollback transaction

整体执行流程如下图所示:

数据接收

当doris接收到用户提交的stream load请求,通过StreamLoadAction::on_chunk_data接收http请求中的数据,并将数据append到该stream load对应的body_sink中。其中:

  • 对于大部分数据格式,都支持使用use_streaming的方式,则将body_sink指定为StreamLoadPipe,该class主要是将数据缓存起来。

  • 对于少部分不支持streaming的数据格式,则将body_sink指定为MessageBodyFileSink,该class主要将数据存储在本地文件中。

数据导入

BrokerScanNode算子

BrokerScanNode算子在open时,会启动一个线程从streaming或者本地文件中读取数据,存入BrokerScanNode_batch_queue中。

《Doris查询计划》中所讲,查询计划在执行过程中,会自顶向下调用算子的get_next函数。BrokerScanNode算子在get_next时,从_batch_queue中获取一个数据batch。

当执行完BrokerScanNode算子的get_next获取到row_batch之后,会将改row_batch通过OlapTableSink写入表中。

OlapTableSink算子

OlapTableSink算子在prepare阶段,会对每个rollup(包括base table)建立对应的IndexChannel。在IndexChannel中,获取到其所有tablets对应的node信息(每个tablet的所有副本都需要找到对应的node),建立对应的NodeChannel,在NodeChannel中,会根据该node的地址以及brpc port,获取一个brpc stub,用于发送信息至该node。

OlapTableSink算子在open阶段,会创建一个后台线程,依次对IndexChannel及其内部的NodeChannel中的_pending_batches,通过PTabletWriterAddBatchRequest请求发送到对应的be node。

OlapTableSink::send的处理逻辑如下:

  • 如果存在表达式,则根据表达式将input_batch进行转换。

  • 对上述row_batch进行校验,对row_batch的每个row中的每个slot,分别根据类型进行校验,筛选出不符合要求的数据。

  • 通过上述IndexChannel::add_row,首先获取tablet对应的NodeChannel,并逐行地将数据添加到其对应的NodeChannel::_cur_batch中,当_cur_batch中的数据大于BATCH_SIZE_FOR_SEND(2MB)时,则将_cur_batch中数据存入_pending_batches中。上述open阶段创建的线程,会从_pending_batches中依次取出batch,发送至对应的be node

OlapTableSink::close中,将NodeChannel::_cur_batch中的剩余的数据放入_pending_batches中。后续这些数据则会通过上述open阶段创建的线程发送出去。

row batch写入

NodeChannel执行open操作时:

  • 向对应的be node发送PTabletWriterOpenRequest请求,be node打开写入通道。

  • be节点的PInternalServiceImpl在接收到请求时,将请求转发给LoadChannelManager

  • LoadChannelManager根据request中的load_id获取(或创建)一个LoadChannel并执行open操作。对于每个Stream load,都会在其对应的StreamLoadContext中随机生成一个load_id

  • LoadChannel::open则根据index_id获取(或创建)一个TabletsChannel并执行open操作。

  • TabletsChannel::open操作中,会为每个tablet创建一个DeltaWriter,并执行open操作。

  • 对于每一个DeltaWriter,其内部都包含一个memtable和RowsetWriter。当DeltaWriter::write第一次写入时,会执行init操作,在init时会分别对memtable和RowsetWriter进行创建。

其对应关系如下图:

当be node接收到PTabletWriterAddBatchRequest时,开始执行写入操作:

  • PInternalServiceImpl在接收到请求时,将请求转发给LoadChannelManager

  • LoadChannelManager在接收到请求时,首先根据load_id找到对应的LoadChannel,将请求转发给该LoadChannel

  • LoadChannel在接收到请求时,首先根据index_id找到对应的TabletsChannel,将请求转发给该TabletsChannel

  • PTabletWriterAddBatchRequest中记录了每个row对应的tablet id,TabletsChannel根据tablet id,将请求转发给对应的DeltaWriter

  • 每个DeltaWriter中都有一个memtable,DeltaWriter::write将属于该tablet的所有row写入到其memtable中。memtable是由skiplist实现的,排序规则使用了按照schema的key的顺序依次对字段进行比较,这样保证了写入的每一个写入 Segment 中的数据是有序的。对于不同的数据模型,写入操作有不同的实现:

    • 对于duplicate key,则直接将数据写入到skiplist中

    • 对于aggregate key或者unique key,则需要先在skiplist中查询是否有相同key的row,如果有,则对这两个row的所有value列使用对应的聚合函数进行聚合,然后再写入到skiplist中。

  • 当memtable大小超过200MB(可配置值)时,则启动一个后台线程执行flush操作,同时生成一个新的Memtable继续接收新数据的写入。

memtable flush操作流程:

  • 后台线程,通过MemTable::flush执行memtable的flush操作。该函数主要通过调用DeltaWriter中传递过来的RowsetWriterflush_single_memtable来执行flush操作。由于Doris存在两种数据格式,这里主要讲解BetaRowsetWriter

  • BetaRowsetWriter::flush_single_memtable首先创建一个SegmentWriter,并通过SegmentWriter::append_row将row batch中的数据逐行通过SegmentWriter写入。

  • SegmentWriter中会为每个column根据其类型创建一个ColumnWriter,在ColumnWriter中,创建page_builder,对nullable的列创建null bitmap,以及按需创建各类索引builder(包括一级索引、zone map索引、bloom filter索引、bitmap索引)。当写入数据row时,以此调用所有的ColumnWriter写入该row的所有cell,ColumnWriter将数据写入PageBuilder,并对各类索引builder进行更新。当PageBuilder中的数据超过PageBuilderOptions.data_page_size(默认1MB)时,则生成根据当前PageBuilder中的数据生成一个page,放入一个双向链表里。

  • 当SegmentWriter中写入的数据量大于256MB时(或者写入memtable中的最后一批数据),则通过SegmentWriter::finalize通过所有的ColumnWriter::finalize,按照Doris特定的文件格式进行落盘。具体文件格式可以参考

整体的写入流程如下图:

事务管理

下面介绍stream load过程中的事务管理:

  • 如前文说述,当coordinator接收到http请求时,会向fe发送begin_txn请求,开启一个事务,并获取txn_id

  • 当导入完成之后,根据导入状态(OlapTableSink中会根据每个be完成数据导入的状态来判断导入状态,当所有导入都成功时,该次导入状态则为成功),coordinator be决定向fe发送commit_txn或者rollback_txn请求。

  • 当fe接收到commit_txn时,则会记录该txn,后台publish线程则异步地对txn涉及到的所有be发送publish请求。

  • 当be接收到publish请求时,则设置tablet的该rowset版本为可见,并向fe发送response

  • 当fe接收到该txn所有be的response时,在元数据中设置该版本可见。此后,该数据版本将可以被用户查询。

另外,对于每个tablet对应的DeltaWriter,也会开启一个本地的txn,用于记录该txn的状态,具体流程是类似的,只是fe并不参与其中,具体流程如下:

  • DeltaWriter::init时,通过prepare_txn开启一个事务,TxnManager::prepare_txn会在内部的txn信息里添加一条记录。

  • 当写入完成后,DeltaWriter::close时,会执行TxnManager::commit_txn,主要在txn信息里添加上rowset相关信息。

  • 当接收到fe发来的publish请求时,be会启动task完成publish动作。当该task运行时,便会执行TxnManager::publish_txn设置该rowset状态为VISIBLE,随后便将该rowset添加到对应的tablet里。此时,从be的角度来看,该版本数据已经对用户可见。

参考文档

Doris Stream Load原理解析