ElasticSearch原理
一、基础分布式架构
1、ElasticSearch对复杂分布式机制的透明隐藏特性 分布式为了应对大量数据 (1)隐藏了分片机制 (2)cluster discovery 集群发现机制 (3)shard负载均衡 (4)shard副本 (5)请求路由 (6)集群扩容 (7)shard重分配
2、ElasticSearch的垂直扩容与水平扩容 (1)垂直扩容 替换服务器 (2)水平扩容 增加服务器(常用)
3、增减节点时数据rebalance 保持负载均衡
4、mastar节点 (1)管理es集群的元数据,索引创建和删除,维护索引元数据 (2)默认情况下,自动选举出mastar
5、节点平等的分布式架构 (1)节点对等,每个节点都能接收所有请求 (2)自动路由请求 (3)响应效率
二、 shard & replica机制
1、一个index包含一个或多个shard 2、每个shard是最小的工作单元,承载部分数据, Lucene实例,完整的创建索引和处理请求的能力 3、增减节点时,shard会自动在nodes中负载均衡 4、primary shard和replica shard,每个document肯定只存在于某一个primary shard 以及对应的replica shard中,不可能存在于多个primary shard中 5、replica shard 是primary shard 的副本,负责容错,以及承担读请求负载 6、primary shard数量在创建索引的时候就固定了,replica shard的数量可以随时更改 7、primary shard默认数量5,replica shard默认是1,默认有10个shard 5 primary shard 5 replica shard 8、primary shard 不能和自己的replica shard放在同一个节点上,否则节点宕机,primary shard 和replica shard 都丢失,起不到容错作用,但是,可以和其他primary shard的replica shard 放在同一个节点上
三、 单台node环境下创建index
1、单台node,创建index,有3个primary shard 3 个replica shard
PUT /tianmao
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
}
}
GET /tianmao
2、集群status是yellow 3、这时候,只会将3个primary shard分配到仅有的一个node上,另外3个replica shard无法分配 4、集群可以正常工作,当是一旦出现节点宕机,数据全部丢失,而且集群不可用,无法承接任何请求
四、 2台node环境下创建index
1、有3个primary shard 3 个replica shard 2、primary shard 同步 replica shard 3、primary shard replica shard 都可以处理读请求
五、横向扩容
1、primary & replica自动负载均衡 6 个shard = 3 primary + 3 replica 2、每个node有更少的shard,IO/CPU/Memory资源给每个shard分配的更多,每个shard性能更好 3、扩容的极限,6个shard(3 primary + 3 replica),最多扩容到6台机器, 每个shard可以占用的单台服务器的所有资源,性能最好 4、超出扩容极限,动态修改replica数量,9个shard(3 primary + 6 replica), 扩容到9台机器,比3台机器拥有3倍的读吞吐量,最多容纳2台机器宕机 5、3台机器下,9个shard(3 primary + 6 replica)资源更少,但是容错性更好, 6个shard 容忍1台宕机 6、服务器宕机,保证数据不丢失
六、Elasticsearch容错机制
9 shard 3 node
Node1 | Node2 | Node3 |
---|---|---|
P0 | R1 | R2 |
P1 | R2 | R0 |
P2 | R0 | R1 |
1、master宕机,自动master选举 2、replica容错,提升为primary,status=yellow 3、重启node,数据恢复,使用原有shard,同步发生修改的部分 status=green
分布式文档系统
一、document的核心元数据
1、index元数据 (1)代表document 存放在那个index (2)类似的数据放在一个索引,field字段大致相同 (3)index中包含很多类似的document (4)索引名称必须是小写,不能用下划线开头,不能包含逗号
2、type元数据 (1)代表document属于index中的类别type (2)一个索引通常划分为多个type,逻辑上对index中对些许不同的数据进行分类 (3)type名可以是大写,小写,但是不能用下划线开头,不能包含逗号
3、id元数据 (1)代表document的唯一标识,与index和type一起,唯一定位一个document (2)可以手动指定,也可以不指定,由es自动生成
二、document id生成两种方式解析
1、手动指定
PUT /index/type/id
{
"key": "value"
}
从其他数据库导入可以使用原有id
2、自动生成
POST /index/type
{
"key": "value"
}
自动生成的id 长度为20个字符,url安全,base64编码 GUID,分布式系统并行生成是不会冲突
三、source元数据
1、source元数据,默认情况下会原封不动返回
2、自定义返回字段
PUT /shop/goods/1
{
"name": "iphone",
"price": 6000
}
# 返回全部字段
GET /shop/goods/1
# 返回部分字段,多个字段逗号隔开
GET /shop/goods/1?_source=name
四、document操作
1、document全量替换 (1)语法格式与创建文档一样,如果不存在则创建,否则全量替换 (2)document是不可变的,如果要修改document内容,其中一种方式就是全量替换, 直接对document重建索引,替换所有内容 (3)es会将旧的document标记为deleted,新增我们的document, 当我们创建越来越多的document时,es会自动删除标记为deleted的document
2、document强制创建 (1)创建文档语法与全量替换一样,只想新建文档,不想替换文档时使用 (2)创建语法
PUT /index/type/id?op_type=create
or
PUT /index/type/id/_create
3、document删除 (1)语法
DELETE /index/type/id
(2)不会物理删除,只会将其标记为deleted,当数据越来越多的时候,后台自动删除(lazy delete机制)
五、Elasticsearch并发冲突问题
1、并发冲突会导致数据不准确
2、悲观锁并发控制方案 (1)常见于mysql (2)只有一个线程能读数据,写回去之后再释放所锁,期间其他线程不能读取数据 (3)行级锁,表级锁,读锁,写锁 (4)优点:方便,直接加锁 (5)缺点:并发能力低
3、乐观锁并发控制方案(es方案) (1)乐观锁不加锁,每个线程都可以操作 (2)写入之前比对version,如果不同,重新读取数据操作后再写入 (3)优点:并发能力高 (4)缺点:操作麻烦,每次都要比对版本号
4、es基于version进行乐观锁并发控制 (1)version版本号元数据 (2)第一次创建document,版本号是1,修改或删除都自加1 (3)document删除之后,不会物理删除,再次写入会在原先版本号之上自增1 (4)primary和replica之间同步是多线程异步,基于version版本号并发控制 (5)高版本先到,低版本后到,后到达的低版本会被丢弃,新数据不会被旧的数据覆盖
5、es基于version进行乐观锁并发控制示例
两个线程对同一条商品信息进行更新操作过程
# 1、查询数据
GET /shop/goods/1
{
"_version": 1,
"_source": {
"name": "iphone",
}
}
# 2、先进行更新 成功
POST /shop/goods/1?version=1
{
"name":"xiaomi"
}
# 3、后更新 失败
POST /shop/goods/1?version=1
{
"name":"huawei"
}
# 4、查询最新版本号
GET /shop/goods/1
{
"_version": 2,
"_source": {
"name": "xiaomi",
}
}
# 5、用最新的版本号再次更新 成功
POST /shop/goods/1?version=2
{
"name":"huawei"
}
6、基于external version进行乐观锁并发控制
es提供的version 一致 时进行修改,不一样报错
?version=1
当external提供的版本号比es中的版本号 大 时才进行修改
?version=1&version_type=external
例如:
# 1、添加一条数据
PUT /shop/goods/1
{
"name": "iphone"
}
# 2、查看数据版本号,此时verison=1
GET /shop/goods/1
# 3、用外部版本号为2的数据去更新 成功
PUT /shop/goods/1?version=2&version_type=external
{
"name": "xiaomi"
}
# 4、再次用外部版本号为2的数据去更新 失败
PUT /shop/goods/1?version=2&version_type=external
{
"name": "huawei"
}
# 5、查看版本号,此时版本号version=2,使用外部版本号verison=4的数据去更新 成功
PUT /shop/goods/1?version=4&version_type=external
{
"name": "huawei"
}
# 并且最后的版本号显示为version=4
六、update实现原理
1、update 全量更新 创建 & 替换文档一样的语法
PUT /index/type/id
一般应用程序执行流程 (1)应用程序发起GET请求,获取document,展示到前台 (2)用户前台修改的数据,发送到后台 (3)后台代码将用户修改的数据在内存中执行,然后封装好修改后的数据 (4)后台发起PUT请求,到es中,进行全量更新 (5)es将旧的document标记为deleted,然后重新创建一个新的document
2、partial update 局部更新
PUT /index/type/id/_update
{
data
}
3、es内部执行流程 (1)先获取document (2)将新的字段值更新到document中 (3)将旧的document标记为deleted (4)将修改的document创建出来
4、局部更新优点 (1)所有查询、修改操作发生在es的shard内部,避免网络开销,提升效率 (2)减少并发冲突
5、局部更新数据示例
# 写入数据
PUT /shop/goods/1
{
"name": "iphone"
}
# 局部更新
POST /shop/goods/1/_update
{
"doc": {
"price": 36
}
}
# 查看数据
GET /shop/goods/1
6、基于groovy脚本进行partial update (1)内置脚本
# 准备数据
PUT /shop/goods/1
{
"name": "iphone",
"price": 10
}
# 脚本更新数据
POST /shop/goods/1/_update
{
"script": "ctx._source.price+=1"
}
# 查看更新结果
GET /shop/goods/1
{
"_source": {
"name": "iphone",
"price": 11
}
}
(2)外部脚本
es安装目录下新建脚本文件 elasticsearch-5.2.0/config/scripts/add-shop-goods-price.groovy
ctx._source.price+=1
执行更新
POST /shop/goods/1/_update
{
"script": {
"lang": "groovy",
"file":"add-shop-goods-price"
}
}
# 查看更新结果
GET /shop/goods/1
{
"_source": {
"name": "iphone",
"price": 12
}
}
(3)通过外部脚本删除文档 新建脚本文件 elasticsearch-5.2.0/config/scripts/delete-document.groovy
ctx.op = ctx._source.price == price ? 'delete' : 'none'
POST /shop/goods/1/_update
{
"script": {
"lang": "groovy",
"file": "delete-document",
"params": {
"price": 12
}
}
}
# 查看删除结果
GET /shop/goods/1
{
"_index": "shop",
"_type": "goods",
"_id": "1",
"found": false
}
(4)upsert 操作 如果document存在则执行更新操作,不存在则执行upsert操作
# 1、新建数据
PUT /shop/goods/1
{
"name": "iphone",
"price": 10
}
# 2、查看数据
GET /shop/goods/1
{
"_version": 1,
"_source": {
"name": "iphone",
"price": 10
}
}
# 3、更新数据
POST /shop/goods/1/_update
{
"doc": {
"name": "huawei"
},
"upsert": {
"name": "xiaomi"
}
}
# 4、查看更新的数据version=2,name=huawei
GET /shop/goods/1
{
"_version": 2,
"_source": {
"name": "huawei",
"price": 10
}
}
# 5、删除数据
DELETE /shop/goods/1
# 6、再次更新数据
POST /shop/goods/1/_update
{
"doc": {
"name": "huawei"
},
"upsert": {
"name": "xiaomi"
}
}
# 7、查看数据,vesion=4,name=xiaomi
GET /shop/goods/1
{
"_version": 4,
"_source": {
"name": "xiaomi"
}
}
7、partial update内置乐观锁并发控制 可以设置重试次数
POST taobao/product/1/_update?version=1&retry_on_conflict=5
{
"doc": {
"name": "red cat"
}
}
七、批量查询
1、批量查询的好处 如果一条一条的查询,查询100条就要发送100次网络请求,开销较大 如果批量查询,查询100条数据,只用发送1次网络请求,减少网络开销
2、单条查询
GET tianmao/product/1
3、mget 语法
GET /_mget
{
"docs": [{
"_index": "taobao",
"_type": "product",
"_id": 1
},
{
"_index": "tianmao",
"_type": "product",
"_id": 1
}
]
}
4、查询同一个index下不同type数据
GET taobao/_mget
{
"docs": [
{
"_type": "product",
"_id": 1
},
{
"_type": "product",
"_id": 2
}
]
}
5、查询同一个index下,同type数据
GET taobao/product/_mget
{
"ids": [1, 2]
}
6、mget很重要 如果一次查询多条数据,那么久使用mget,减少网络开销
七、批量操作
1、bulk语法
每次操作有两个json串,放在bulk里
POST /_bulk
{"action": {"metadata"}}
{"data"}
例如 创建一个文档
{"index": {"_index": "taobao", "_type": "product", "_id": "1"}}
{"name": "Dog", "price": 25}
2、可以执行bulk操作的类型 (1)delete 删除文档 (2)create 强制创建 PUT /index/type/id/_create
(3)index 普通PUT,创建文档,也可是全量替换文档 (4)update 执行的partial update操作
注意: 单个json串不能换行,json串之间必须换行 任意一个操作失败,不会影响其他操作,会在结果中显示
示例
POST _bulk
{"create": {"_index": "taobao", "_type": "product", "_id": 4}}
{"name": "Pig", "price": 36}
{"delete": {"_index": "taobao", "_type": "product", "_id": 1}}
{"index": {"_index": "taobao", "_type": "product", "_id": 5}}
{"name": "Pig", "price": 45}
{"update": {"_index": "taobao", "_type": "product", "_id": 5}}
{"doc": {"price": 54}}
3、指定index
POST taobao/_bulk
{"create": {"_type": "product", "_id": 4}}
{"name": "Pig", "price": 36}
4、指定index 和type
POST taobao/product/_bulk
{"create": {"_id": 4}}
{"name": "Pig", "price": 36}
5、bulk性能 bulk request会加载到内存中,如果太大反而会降低性能,因此需要反复尝试一个最佳的bulk size 一般从1000~5000条数据开始,尝试增加 如果看大小,最好在5~15MB之间
八、阶段总结
1、阶段性总结 1-8 快速入门,基本原理,基本操作 9-13 ES分布式的基本原理, 14-27 document讲解
ElasticSearch 最核心功能 分布式文档的数据存储系统
分布式 distributed document store 文档数据 可以存储json格式的文档,核心数据结构 存储系统 对数据进行存储,查询,创建,更新,删除
一个NoSQL存储系统 操作document
可以开发的应用程序 (1)数据量较大,快速进行扩容,承载大量数据 (2)数据结构灵活操作,不用去处理传统数据库中的表 (3)对数据操作较为简单,基本的增删改查 (4)NoSQL数据库,也适用以上场景
九、document数据路由原理
1、document路由,数据路由 一个index会被分成多片,每片都在一个shard中, 一个document,只能存在一个shard中 客户端创建document的时候,es就会决定document存放在哪个shard上
2、路由算法
shard = hash(routing) % number_of_primary_shards
举例: 一个index 有3个primary shard P0, P1, P2 (1)增删改查document时候,会有一个routing number 默认 routing = id (2)传入hash函数产生routing的hash值 hash(routing) = 21 (3)hash值 对index 的primary shard数量取余 21 % 3 =0 (4)就决定了这个document 放在 P0之上
决定document 在哪个shard上,最重要的值是routing,默认是id,也可以手动指定 相同的routing值,每次求hash再求余,结果也一样 结果一定在 0 ~ number_of_primary_shards - 1之间
默认的routing是id,也可以手动指定 以便后续进行应用级别的负载均衡,以及提升批量读取的性能也很有帮助
PUT /index/type/id?routing=<user_id>
基于document路由算法,primary shard建立之后就不能修改,但是replica shard可以修改
十、document增删改内部原理
1、客户端选择一个node发送请求过去,这个node就是coordinating node(协调节点) 2、coordinating node 对document进行路由,将请求转发给对应的node(有primary shard) 3、实际的node上primary shard处理请求,然后将数据同步到replica shard 4、coordinating node 如果发现primary node 和所属 replica node都搞定之后,就放回响应结果给客户端
例如 3个节点,3个primary shard replica=1, replica shard=3 总共6个shard
client 要创建一个document docuemnt可以发送任意一个node
增删改操作只能由primary shard操作,会自动同步到replica sahrd
十一、分布式文档系统
1、consistency
发送增删改操作的时候,可以带上一个consistency参数,指明我们想要的写一致性
PUT /index/type/id?consistency=quorum
one(primary shard): 写操作,只要primary shard是active就可以执行 all(all shard): 写操作,必须所有priamry shard和replica shard都是active才执行 quorum(default): 默认值,要求所有的shard,大部分shar都是活跃的才可以执行写操作
2、quorum机制 写之前必须确保大多数shard都可用,当number_of_replica > 1时才生效
计算公式
quorum = int((primary + number_of_replica)/2) + 1
举例: primary_shard=3, number_of_replica=1, 总shard=6 quorum = int((3 + 1)/2) + 1 = 3
所以6个shard,至少3个shard处于active,才可以执行写操作
3、如果节点数量少于quorum数量,可能导致quorum不齐全,导致无法执行写操作 primary 和replica 必须在不同节点上,如果两个机器,有可能出现3个shard分配不齐全, 此时可能回出现写操作无法执行
例如: node=2, primay=1 replica=3 quorum = int((primay + replica)/2) + 1 = 3 node1 -> primary node2 -> replica1 所以active=2 < quorum=3, 不满足quorum机制
提供了一个特殊场景,就是number_of_replica>1才生效 如果不做处理,单节点就无法工作
4、quorum不齐全时,wait,默认1分钟,timeout 写操作可以设置timeout参数,单位 毫秒
PUT /index/type/id?timeout=30
十二、document查询内部原理
1、客户端发送请求到任意一个node
2、coordinating node接收到document读请求后,进行路由,转发给对应的node, 随机轮询算法在primary shard 和 replica shard中随机选一个shard,使得负载均衡
3、接收请求的node返回document给coordination node
4、coordinating node返回document给客户端
5、特殊情况,document如果还在建立索引过程中,可能只有primary shard有数据,replica shard没数据, 可能导致无法读取到document,但是document完成索引建立之后,primary shard和replica shard就都有数据了
对于读请求,不一定将请求转发到primary shard也可以转发到replica shard replica shard也可以处理所有的读请求
使用round-robin 随机轮询算法 比如说: coordinating node接收到一个document的4次查询,就会使用算法 将2次查询请求转发给P1,将2次查询请求转发给R1 尽量让primary shard 和所有的replica shard均匀服务读请求,使得负载均衡
十三、bulk api json格式与底层性能
bulk api Json格式
{"action": "meta"}
{"data"}
1、bulk中的每个操作都可能要转发到不同node的shard去执行
2、如果采用比较良好的json数组格式 (1)将json数组解析为JSONArray对象 文本->JsonArray对象 (2)对每个请求中的document进行路由 (3)为路由到同一个shard上的多个请求,创建一个请求数组 (4)将序列化后的数据发送到对应的节点上去
3、耗费更多内存,更多的jvm开销 bulk size 最佳大小,如果单次传输数据过多,内存占用过多,性能下降,垃圾回收耗费时间
4、现在的方式 (1)直接按照换行符切割json (2)对每两个一组的json,读取mete,进行document路由 (3)直接将对应的json发送到node上去
5、最大的优势在于,不需要将json数组解析为一个JSONArray对象, 形成一份大数据拷贝,避免了内存空间的浪费