Skip to content

写流程

在ES中,写入单个文档的请求称 为Index请求,批量写入的请求称为Bulk请求。写单个和多个文档使用相同的处理逻辑,请求被统一封装为BulkRequest。

文档操作的定义

在ES中,对文档的操作有下面几种类型:

java
enum OpType {
INDEX(0),
CREATE(1),
UPDATE(2),
DELETE(3);
}
  • INDEX:向索引中“put”一个文档的操作称为“索引”一个文档。 此处“索引”为动词。
  • CREATE:put 请求可以通过 op_type 参数设置操作类型为 create,在这种操作下,如果文档已存在,则请求将失败。
  • UPDATE:默认情况下,“put”一个文档时,如果文档已存在, 则更新它。
  • DELETE:删除文档。 在put API中,通过op_type参数来指定操作类型。

可选参数

Index/Bulk基本流程

新建、索引(这里的索引是动词,指写入操作,将文档添加到 Lucene的过程称为索引一个文档)和删除请求都是写操作。写操作必须先在主分片执行成功后才能复制到相关的副分片。

以下是写单个文档所需的步骤:

  • (1)客户端向NODE1发送写请求。
  • (2)NODE1使用文档ID来确定文档属于分片0,通过集群状态 中的内容路由表信息获知分片0的主分片位于NODE3,因此请求被转发到NODE3上。
  • (3)NODE3上的主分片执行写操作。如果写入成功,则它将请 求并行转发到 NODE1和NODE2的副分片上,等待返回结果。当所有的副分片都报告成功,NODE3将向协调节点报告成功,协调节点再向客户端报告成功。

在客户端收到成功响应时,意味着写操作已经在主分片和所有副 分片都执行完成。

写一致性的默认策略是quorum,即多数的分片(其中分片副本 可以是主分片或副分片)在写入操作时处于可用状态。

quorum = int( (primary + number_of_replicas) / 2 )+ 1

Index/Bulk详细流程

以不同角色节点执行的任务整理流程如下图所示。

协调节点流程

协调节点负责创建索引、转发请求到主分片节点、等待响应、回 复客户端。

实现位于TransportBulkAction。执行本流程的线程池: http_server_worker。

1. 参数检查

如同我们平常设计的任何一个对外服务的接口处理一样,收到用 户请求后首先检测请求的合法性,把检查操作放在处理流程的第一 步,有问题就直接拒绝,对异常请求的处理代价是最小的。

检查操作进行以下参数检查,如下表所示。

2. 处理pipeline请求

数据预处理(ingest)工作通过定义pipeline和processors实 现。pipeline是一系列processors的定义,processors按照声明的顺序执行。

3. 自动创建索引

如果配置为允许自动创建索引(默认允许),则计算请求中涉及 的索引,可能有多个,其中有哪些索引是不存在的,然后创建它。如果部分索引创建失败,则涉及创建失败索引的请求被标记为失败。其他索引正常执行写流程。

创建索引请求被发送到Master节点,待收到全部创建请求的 Response(无论成功还是失败的)之后,才进入下一个流程。 Master节点什么时候返回Response?在Master节点执行完创建索引流程,将新的clusterState发布完毕才会返回。那什么才算发布完毕呢?默认情况下,Master发布clusterState的Request收到半数以上的节点Response,认为发布成功。负责写数据的节点会先执行一遍内容路由的过程以处理没有收到最新clusterState的情况。

4. 对请求的预先处理

这里不同于对数据的预处理,对请求的预先处理只是检查参数、 自动生成id、处理routing等。

由于上一步可能有创建索引操作,所以在此先获取最新集群状态 信息。然后遍历所有请求,从集群状态中获取对应索引的元信息,检查mapping、routing、id等信息。如果id不存在,则生成一个UUID作为文档id。实现位于TransportBulkAction.BulkOperation#doRun。

5. 检测集群状态

协调节点在开始处理时会先检测集群状态,若集群异常则取消写 入。例如,Master节点不存在,会阻塞等待Master节点直至超时。

因此索引为Red时,如果Master节点存在,则数据可以写到正常 shard,Master节点不存在,协调节点会阻塞等待或取消写入。

6. 内容路由,构建基于shard的请求

将用户的 bulkRequest 重新组织为基于 shard 的请求列表。例如,原始用户请求可能有10个写操作,如果这些文档的主分片都属于同一个,则写请求被合并为1个。所以这里本质上是合并请求的过程。此处尚未确定主分片节点。

7. 路由算法

路由算法就是根据routing和文档id计算目标shardid的过程。

一般情况下,路由计算方式为下面的公式:

shard_num = hash(_routing) % num_primary_shards 默

认情况下,_routing值就是文档id

ES使用随机id和Hash算法来确保文档均匀地分配给分片。当使用 自定义id或routing时, id 或 routing 值可能不够随机,造成数据倾斜,部分分片过大。在这种情况下,可以使用index.routing_partition_size 配置来减少倾斜的风险。 routing_partition_size 越大,数据的分布越均匀。

在设置了index.routing_partition_size的情况下,计算公式为:shard_num = ( hash ( _routing ) + hash ( _id ) % routing_partition_size ) % num_primary_shards 也 就 是 说 ,_routing字段用于计算索引中的一组分片,然后使用_id来选择该组内的分片。

index.routing_partition_size 取 值 应 具 有 大 于 1 且 小 于 index.number_of_shards的值。

8. 转发请求并等待响应 主要是根据集群状态中的内容路由表确定主分片所在节点,转发 请求并等待响应。遍历所有需要写的 shard,将位于某个 shard 的请求封装为BulkShardRequest 类 , 调用 TransportShardBulkAction#execute执行发送,在listener中等待响应,每个响应也是以shard为单位的。

如果某个shard的响应中部分doc写失败了,则将异常信息填充到Response中,整体请求做成功处理。待收到所有响应后(无论成功还是失败的),回复给客户端。

7.4.2 主分片节点流程

执行本流程的线程池:bulk。

主分片所在节点负责在本地写主分片,写成功后,转发写副本片 请求,等待响应,回复协调节点。

1. 检查请求

主分片所在节点收到协调节点发来的请求后也是先做了校验工 作,主要检测要写的是否是主分片,AllocationId 是否符合预期,索引是否处于关闭状态等。

2. 是否延迟执行

判断请求是否需要延迟执行,如果需要延迟则放入队列,否则继 续下面的流程。

3. 判断主分片是否已经发生迁移

如果已经发生迁移,则转发请求到迁移的节点。

4. 检测写一致性

在开始写之前,检测本次写操作涉及的shard,活跃shard数量是 否足够,不足则不执行写入。默认为1,只要主分片可用就执行写入。

5. 写Lucene和事务日志

遍历请求,处理动态更新字段映射,然后调用InternalEngine#index逐条对doc进行索引。

Engine封装了Lucene和translog的调用,对外提供读写接口。

在写入Lucene之前,先生成Sequence Number和Version。这 些都是在InternalEngine类中实现的。Sequence Number每次递增1,Version根据当前doc的最大版本加1。

索引过程为先写Lucene,后写translog。

因为Lucene写入时对数据有检查,写操作可能会失败。如果先写 translog,写入Lucene 时失败,则还需要对translog进行回滚处理。

6. flush translog

根据配置的translog flush策略进行刷盘控制,定时或立即刷盘

7. 写副分片

现在已经为要写的副本shard准备了一个列表,循环处理每个 shard,跳过unassigned状态的shard,向目标节点发送请求,等待响应。这个过程是异步并行的。

转 发 请 求 时 会 将 SequenceID 、 PrimaryTerm 、 GlobalCheckPoint、version等传递给副分片。

replicasProxy.performOn ( shard, replicaRequest, globalCheckpoint,...);

在等待Response的过程中,本节点发出了多少个Request,就要 等待多少个Response。无论这些Response是成功的还是失败的,直到超时。

收集到全部的Response后,执行finish()。给协调节点返回消 息,告知其哪些成功、哪些失败了。

8. 处理副分片写失败情况

主分片所在节点将发送一个shardFailed请求给Master, 然后Master会更新集群状态,在新的集群状态中,这个shard 将:

  • 从in_sync_allocations列表中删除;
  • 在routing_table的shard列表中将state由STARTED更改为 UNASSIGNED;
  • 添加到routingNodes的unassignedShards列表。

副分片节点流程

执行本流程的线程池:bulk。

执行与主分片基本相同的写doc过程,写完毕后回复主分片节点。

在副分片的写入过程中,参数检查的实现与主分片略有不同,最 终都调用 IndexShardOperationPermits#acquire判断是否需要delay,继续后面的写流程。

I/O异常处理

在一个shard上执行的一些操作可能会产生I/O异常之类的情况。 一个shard上的CRUD等操作在ES里由一个Engine对象封装,在 Engine处理过程中,部分操作产生的部分异常ES会认为有必要关闭此Engine,上报Master。例如,系统I/O层面的写入失败,这可能意味着磁盘损坏。

对Engine异常的捕获目前主要通过IOException实现。

注意: 其中不包含get操作,也就是说,读取doc失败不会触发 shard迁移。

异常流程

  • (1)如果请求在协调节点的路由阶段失败,则会等待集群状态更新,拿到更新后,进行重试,如果再次失败,则仍旧等集群状态更新,直到超时1分钟为止。超时后仍失败则进行整体请求失败处理。
  • (2)在主分片写入过程中,写入是阻塞的。只有写入成功,才会发起写副本请求。如果主shard写失败,则整个请求被认为处理失败。 如果有部分副本写失败,则整个请求被认为处理成功。
  • (3)无论主分片还是副分片,当写一个doc失败时,集群不会重试,而是关闭本地shard,然后向Master汇报,删除是以shard为单位的。

系统特性

ES本身也是一个分布式存储系统,如同其他分布式系统一样,我 们经常关注的一些特性如下。

  • 数据可靠性:通过分片副本和事务日志机制保障数据安全。
  • 服务可用性:在可用性和一致性的取舍方面,默认情况下 ES 更倾向于可用性,只要主分片可用即可执行写入操作。
  • 一致性:笔者认为是弱一致性。只要主分片写成功,数据就可能 被读取。因此读取操作在主分片和副分片上可能会得到不同结果。
  • 原子性:索引的读写、别名更新是原子操作,不会出现中间状 态。但bulk不是原子操作,不能用来实现事务。
  • 扩展性:主副分片都可以承担读请求,分担系统负载。

Released under the MIT License.