Skip to content

ElasticSearch原理

一、基础分布式架构

1、ElasticSearch对复杂分布式机制的透明隐藏特性 分布式为了应对大量数据 (1)隐藏了分片机制 (2)cluster discovery 集群发现机制 (3)shard负载均衡 (4)shard副本 (5)请求路由 (6)集群扩容 (7)shard重分配

2、ElasticSearch的垂直扩容与水平扩容 (1)垂直扩容 替换服务器 (2)水平扩容 增加服务器(常用)

3、增减节点时数据rebalance 保持负载均衡

4、mastar节点 (1)管理es集群的元数据,索引创建和删除,维护索引元数据 (2)默认情况下,自动选举出mastar

5、节点平等的分布式架构 (1)节点对等,每个节点都能接收所有请求 (2)自动路由请求 (3)响应效率

二、 shard & replica机制

1、一个index包含一个或多个shard 2、每个shard是最小的工作单元,承载部分数据, Lucene实例,完整的创建索引和处理请求的能力 3、增减节点时,shard会自动在nodes中负载均衡 4、primary shard和replica shard,每个document肯定只存在于某一个primary shard 以及对应的replica shard中,不可能存在于多个primary shard中 5、replica shard 是primary shard 的副本,负责容错,以及承担读请求负载 6、primary shard数量在创建索引的时候就固定了,replica shard的数量可以随时更改 7、primary shard默认数量5,replica shard默认是1,默认有10个shard 5 primary shard 5 replica shard 8、primary shard 不能和自己的replica shard放在同一个节点上,否则节点宕机,primary shard 和replica shard 都丢失,起不到容错作用,但是,可以和其他primary shard的replica shard 放在同一个节点上

三、 单台node环境下创建index

1、单台node,创建index,有3个primary shard 3 个replica shard

PUT /tianmao
{
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1
  }
}

GET /tianmao

2、集群status是yellow 3、这时候,只会将3个primary shard分配到仅有的一个node上,另外3个replica shard无法分配 4、集群可以正常工作,当是一旦出现节点宕机,数据全部丢失,而且集群不可用,无法承接任何请求

四、 2台node环境下创建index

1、有3个primary shard 3 个replica shard 2、primary shard 同步 replica shard 3、primary shard replica shard 都可以处理读请求

五、横向扩容

1、primary & replica自动负载均衡 6 个shard = 3 primary + 3 replica 2、每个node有更少的shard,IO/CPU/Memory资源给每个shard分配的更多,每个shard性能更好 3、扩容的极限,6个shard(3 primary + 3 replica),最多扩容到6台机器, 每个shard可以占用的单台服务器的所有资源,性能最好 4、超出扩容极限,动态修改replica数量,9个shard(3 primary + 6 replica), 扩容到9台机器,比3台机器拥有3倍的读吞吐量,最多容纳2台机器宕机 5、3台机器下,9个shard(3 primary + 6 replica)资源更少,但是容错性更好, 6个shard 容忍1台宕机 6、服务器宕机,保证数据不丢失

六、Elasticsearch容错机制

9 shard 3 node

Node1Node2Node3
P0R1R2
P1R2R0
P2R0R1

1、master宕机,自动master选举 2、replica容错,提升为primary,status=yellow 3、重启node,数据恢复,使用原有shard,同步发生修改的部分 status=green

分布式文档系统

一、document的核心元数据

1、index元数据 (1)代表document 存放在那个index (2)类似的数据放在一个索引,field字段大致相同 (3)index中包含很多类似的document (4)索引名称必须是小写,不能用下划线开头,不能包含逗号

2、type元数据 (1)代表document属于index中的类别type (2)一个索引通常划分为多个type,逻辑上对index中对些许不同的数据进行分类 (3)type名可以是大写,小写,但是不能用下划线开头,不能包含逗号

3、id元数据 (1)代表document的唯一标识,与index和type一起,唯一定位一个document (2)可以手动指定,也可以不指定,由es自动生成

二、document id生成两种方式解析

1、手动指定

PUT /index/type/id
{
  "key": "value"
}

从其他数据库导入可以使用原有id

2、自动生成

POST /index/type
{
  "key": "value"
}

自动生成的id 长度为20个字符,url安全,base64编码 GUID,分布式系统并行生成是不会冲突

三、source元数据

1、source元数据,默认情况下会原封不动返回

2、自定义返回字段

PUT /shop/goods/1
{
  "name": "iphone",
  "price": 6000
}

# 返回全部字段
GET /shop/goods/1

# 返回部分字段,多个字段逗号隔开
GET /shop/goods/1?_source=name

四、document操作

1、document全量替换 (1)语法格式与创建文档一样,如果不存在则创建,否则全量替换 (2)document是不可变的,如果要修改document内容,其中一种方式就是全量替换, 直接对document重建索引,替换所有内容 (3)es会将旧的document标记为deleted,新增我们的document, 当我们创建越来越多的document时,es会自动删除标记为deleted的document

2、document强制创建 (1)创建文档语法与全量替换一样,只想新建文档,不想替换文档时使用 (2)创建语法

PUT /index/type/id?op_type=create

or

PUT /index/type/id/_create

3、document删除 (1)语法

DELETE /index/type/id

(2)不会物理删除,只会将其标记为deleted,当数据越来越多的时候,后台自动删除(lazy delete机制)

五、Elasticsearch并发冲突问题

1、并发冲突会导致数据不准确

2、悲观锁并发控制方案 (1)常见于mysql (2)只有一个线程能读数据,写回去之后再释放所锁,期间其他线程不能读取数据 (3)行级锁,表级锁,读锁,写锁 (4)优点:方便,直接加锁 (5)缺点:并发能力低

3、乐观锁并发控制方案(es方案) (1)乐观锁不加锁,每个线程都可以操作 (2)写入之前比对version,如果不同,重新读取数据操作后再写入 (3)优点:并发能力高 (4)缺点:操作麻烦,每次都要比对版本号

4、es基于version进行乐观锁并发控制 (1)version版本号元数据 (2)第一次创建document,版本号是1,修改或删除都自加1 (3)document删除之后,不会物理删除,再次写入会在原先版本号之上自增1 (4)primary和replica之间同步是多线程异步,基于version版本号并发控制 (5)高版本先到,低版本后到,后到达的低版本会被丢弃,新数据不会被旧的数据覆盖

5、es基于version进行乐观锁并发控制示例

两个线程对同一条商品信息进行更新操作过程

# 1、查询数据
GET /shop/goods/1
{
  "_version": 1,
  "_source": {
    "name": "iphone",
  }
}

# 2、先进行更新 成功
POST /shop/goods/1?version=1
{
  "name":"xiaomi"
}

# 3、后更新 失败
POST /shop/goods/1?version=1
{
  "name":"huawei"
}

# 4、查询最新版本号
GET /shop/goods/1
{
  "_version": 2,
  "_source": {
    "name": "xiaomi",
  }
}

# 5、用最新的版本号再次更新 成功
POST /shop/goods/1?version=2
{
  "name":"huawei"
}

6、基于external version进行乐观锁并发控制

es提供的version 一致 时进行修改,不一样报错

?version=1

当external提供的版本号比es中的版本号 大 时才进行修改

?version=1&version_type=external

例如:

# 1、添加一条数据
PUT /shop/goods/1
{
  "name": "iphone"
}

# 2、查看数据版本号,此时verison=1
GET /shop/goods/1

# 3、用外部版本号为2的数据去更新 成功
PUT /shop/goods/1?version=2&version_type=external
{
  "name": "xiaomi"
}

# 4、再次用外部版本号为2的数据去更新 失败
PUT /shop/goods/1?version=2&version_type=external
{
  "name": "huawei"
}

# 5、查看版本号,此时版本号version=2,使用外部版本号verison=4的数据去更新 成功
PUT /shop/goods/1?version=4&version_type=external
{
  "name": "huawei"
}

# 并且最后的版本号显示为version=4

六、update实现原理

1、update 全量更新 创建 & 替换文档一样的语法

PUT /index/type/id

一般应用程序执行流程 (1)应用程序发起GET请求,获取document,展示到前台 (2)用户前台修改的数据,发送到后台 (3)后台代码将用户修改的数据在内存中执行,然后封装好修改后的数据 (4)后台发起PUT请求,到es中,进行全量更新 (5)es将旧的document标记为deleted,然后重新创建一个新的document

2、partial update 局部更新

PUT /index/type/id/_update
{
  data
}

3、es内部执行流程 (1)先获取document (2)将新的字段值更新到document中 (3)将旧的document标记为deleted (4)将修改的document创建出来

4、局部更新优点 (1)所有查询、修改操作发生在es的shard内部,避免网络开销,提升效率 (2)减少并发冲突

5、局部更新数据示例

# 写入数据
PUT /shop/goods/1
{
  "name": "iphone"
}

# 局部更新
POST /shop/goods/1/_update
{
  "doc": {
    "price": 36
  }
}

# 查看数据
GET /shop/goods/1

6、基于groovy脚本进行partial update (1)内置脚本

# 准备数据
PUT /shop/goods/1
{
  "name": "iphone",
  "price": 10
}

# 脚本更新数据
POST /shop/goods/1/_update
{
  "script": "ctx._source.price+=1"
}

# 查看更新结果
GET /shop/goods/1
{
  "_source": {
    "name": "iphone",
    "price": 11
  }
}

(2)外部脚本

es安装目录下新建脚本文件 elasticsearch-5.2.0/config/scripts/add-shop-goods-price.groovy

ctx._source.price+=1

执行更新

POST /shop/goods/1/_update
{
  "script": {
    "lang": "groovy", 
    "file":"add-shop-goods-price"
  }
}

# 查看更新结果
GET /shop/goods/1
{
  "_source": {
    "name": "iphone",
    "price": 12
  }
}

(3)通过外部脚本删除文档 新建脚本文件 elasticsearch-5.2.0/config/scripts/delete-document.groovy

ctx.op = ctx._source.price == price ? 'delete' : 'none'
POST /shop/goods/1/_update
{
  "script": {
    "lang": "groovy",
    "file": "delete-document",
    "params": {
      "price": 12
    }
  }
}

# 查看删除结果
GET /shop/goods/1
{
  "_index": "shop",
  "_type": "goods",
  "_id": "1",
  "found": false
}

(4)upsert 操作 如果document存在则执行更新操作,不存在则执行upsert操作

# 1、新建数据
PUT /shop/goods/1
{
  "name": "iphone",
  "price": 10
}

# 2、查看数据
GET /shop/goods/1
{
  "_version": 1,
  "_source": {
    "name": "iphone",
    "price": 10
  }
}

# 3、更新数据
POST /shop/goods/1/_update
{
  "doc": {
    "name": "huawei"
  }, 
  "upsert": {
    "name": "xiaomi"
  }
}

# 4、查看更新的数据version=2,name=huawei
GET /shop/goods/1
{
  "_version": 2,
  "_source": {
    "name": "huawei",
    "price": 10
  }
}

# 5、删除数据
DELETE  /shop/goods/1

# 6、再次更新数据
POST /shop/goods/1/_update
{
 "doc": {
    "name": "huawei"
    }, 
  "upsert": {
    "name": "xiaomi"
  }
}

# 7、查看数据,vesion=4,name=xiaomi
GET /shop/goods/1
{
  "_version": 4,
  "_source": {
    "name": "xiaomi"
  }
}

7、partial update内置乐观锁并发控制 可以设置重试次数

POST  taobao/product/1/_update?version=1&retry_on_conflict=5
{
  "doc": {
    "name": "red cat"
  }
}

七、批量查询

1、批量查询的好处 如果一条一条的查询,查询100条就要发送100次网络请求,开销较大 如果批量查询,查询100条数据,只用发送1次网络请求,减少网络开销

2、单条查询

GET tianmao/product/1

3、mget 语法

GET  /_mget
{
  "docs": [{
    "_index": "taobao",
    "_type": "product",
    "_id": 1
  },
  {
    "_index": "tianmao",
    "_type": "product",
    "_id": 1
  }
  ]
}

4、查询同一个index下不同type数据

GET  taobao/_mget
{
  "docs": [
    {
    "_type": "product",
    "_id": 1
  },
  {
    "_type": "product",
    "_id": 2
  }
  ]
}

5、查询同一个index下,同type数据

GET  taobao/product/_mget
{
  "ids": [1, 2]
}

6、mget很重要 如果一次查询多条数据,那么久使用mget,减少网络开销

七、批量操作

1、bulk语法

每次操作有两个json串,放在bulk里

POST /_bulk
{"action": {"metadata"}}
{"data"}

例如 创建一个文档

{"index": {"_index": "taobao", "_type": "product", "_id": "1"}}
{"name": "Dog", "price": 25}

2、可以执行bulk操作的类型 (1)delete 删除文档 (2)create 强制创建 PUT /index/type/id/_create (3)index 普通PUT,创建文档,也可是全量替换文档 (4)update 执行的partial update操作

注意: 单个json串不能换行,json串之间必须换行 任意一个操作失败,不会影响其他操作,会在结果中显示

示例

POST _bulk
{"create": {"_index": "taobao", "_type": "product", "_id": 4}}
{"name": "Pig", "price": 36}
{"delete": {"_index": "taobao", "_type": "product", "_id": 1}}
{"index": {"_index": "taobao", "_type": "product", "_id": 5}}
{"name": "Pig", "price": 45}
{"update": {"_index": "taobao", "_type": "product", "_id": 5}}
{"doc": {"price": 54}}

3、指定index

POST taobao/_bulk
{"create": {"_type": "product", "_id": 4}}
{"name": "Pig", "price": 36}

4、指定index 和type

POST taobao/product/_bulk
{"create": {"_id": 4}}
{"name": "Pig", "price": 36}

5、bulk性能 bulk request会加载到内存中,如果太大反而会降低性能,因此需要反复尝试一个最佳的bulk size 一般从1000~5000条数据开始,尝试增加 如果看大小,最好在5~15MB之间

八、阶段总结

1、阶段性总结 1-8 快速入门,基本原理,基本操作 9-13 ES分布式的基本原理, 14-27 document讲解

ElasticSearch 最核心功能 分布式文档的数据存储系统

分布式 distributed document store 文档数据 可以存储json格式的文档,核心数据结构 存储系统 对数据进行存储,查询,创建,更新,删除

一个NoSQL存储系统 操作document

可以开发的应用程序 (1)数据量较大,快速进行扩容,承载大量数据 (2)数据结构灵活操作,不用去处理传统数据库中的表 (3)对数据操作较为简单,基本的增删改查 (4)NoSQL数据库,也适用以上场景

九、document数据路由原理

1、document路由,数据路由 一个index会被分成多片,每片都在一个shard中, 一个document,只能存在一个shard中 客户端创建document的时候,es就会决定document存放在哪个shard上

2、路由算法

shard = hash(routing) % number_of_primary_shards

举例: 一个index 有3个primary shard P0, P1, P2 (1)增删改查document时候,会有一个routing number 默认 routing = id (2)传入hash函数产生routing的hash值 hash(routing) = 21 (3)hash值 对index 的primary shard数量取余 21 % 3 =0 (4)就决定了这个document 放在 P0之上

决定document 在哪个shard上,最重要的值是routing,默认是id,也可以手动指定 相同的routing值,每次求hash再求余,结果也一样 结果一定在 0 ~ number_of_primary_shards - 1之间

默认的routing是id,也可以手动指定 以便后续进行应用级别的负载均衡,以及提升批量读取的性能也很有帮助

PUT /index/type/id?routing=<user_id>

基于document路由算法,primary shard建立之后就不能修改,但是replica shard可以修改

十、document增删改内部原理

1、客户端选择一个node发送请求过去,这个node就是coordinating node(协调节点) 2、coordinating node 对document进行路由,将请求转发给对应的node(有primary shard) 3、实际的node上primary shard处理请求,然后将数据同步到replica shard 4、coordinating node 如果发现primary node 和所属 replica node都搞定之后,就放回响应结果给客户端

例如 3个节点,3个primary shard replica=1, replica shard=3 总共6个shard

client 要创建一个document docuemnt可以发送任意一个node

增删改操作只能由primary shard操作,会自动同步到replica sahrd

十一、分布式文档系统

1、consistency

发送增删改操作的时候,可以带上一个consistency参数,指明我们想要的写一致性

PUT /index/type/id?consistency=quorum

one(primary shard): 写操作,只要primary shard是active就可以执行 all(all shard): 写操作,必须所有priamry shard和replica shard都是active才执行 quorum(default): 默认值,要求所有的shard,大部分shar都是活跃的才可以执行写操作

2、quorum机制 写之前必须确保大多数shard都可用,当number_of_replica > 1时才生效

计算公式

quorum = int((primary + number_of_replica)/2) + 1

举例: primary_shard=3, number_of_replica=1, 总shard=6 quorum = int((3 + 1)/2) + 1 = 3

所以6个shard,至少3个shard处于active,才可以执行写操作

3、如果节点数量少于quorum数量,可能导致quorum不齐全,导致无法执行写操作 primary 和replica 必须在不同节点上,如果两个机器,有可能出现3个shard分配不齐全, 此时可能回出现写操作无法执行

例如: node=2, primay=1 replica=3 quorum = int((primay + replica)/2) + 1 = 3 node1 -> primary node2 -> replica1 所以active=2 < quorum=3, 不满足quorum机制

提供了一个特殊场景,就是number_of_replica>1才生效 如果不做处理,单节点就无法工作

4、quorum不齐全时,wait,默认1分钟,timeout 写操作可以设置timeout参数,单位 毫秒

PUT /index/type/id?timeout=30

十二、document查询内部原理

1、客户端发送请求到任意一个node

2、coordinating node接收到document读请求后,进行路由,转发给对应的node, 随机轮询算法在primary shard 和 replica shard中随机选一个shard,使得负载均衡

3、接收请求的node返回document给coordination node

4、coordinating node返回document给客户端

5、特殊情况,document如果还在建立索引过程中,可能只有primary shard有数据,replica shard没数据, 可能导致无法读取到document,但是document完成索引建立之后,primary shard和replica shard就都有数据了

对于读请求,不一定将请求转发到primary shard也可以转发到replica shard replica shard也可以处理所有的读请求

使用round-robin 随机轮询算法 比如说: coordinating node接收到一个document的4次查询,就会使用算法 将2次查询请求转发给P1,将2次查询请求转发给R1 尽量让primary shard 和所有的replica shard均匀服务读请求,使得负载均衡

十三、bulk api json格式与底层性能

bulk api Json格式

{"action": "meta"}
{"data"}

1、bulk中的每个操作都可能要转发到不同node的shard去执行

2、如果采用比较良好的json数组格式 (1)将json数组解析为JSONArray对象 文本->JsonArray对象 (2)对每个请求中的document进行路由 (3)为路由到同一个shard上的多个请求,创建一个请求数组 (4)将序列化后的数据发送到对应的节点上去

3、耗费更多内存,更多的jvm开销 bulk size 最佳大小,如果单次传输数据过多,内存占用过多,性能下降,垃圾回收耗费时间

4、现在的方式 (1)直接按照换行符切割json (2)对每两个一组的json,读取mete,进行document路由 (3)直接将对应的json发送到node上去

5、最大的优势在于,不需要将json数组解析为一个JSONArray对象, 形成一份大数据拷贝,避免了内存空间的浪费