登录
登录 注册新账号
注册
已有账号登录
黑马-Linux云计算+运维开发+全新升级V3版本|完结无密
王之 阅读 142 次
12月8日发布

download:黑马-Linux云计算+运维开发+全新升级V3版本|完结无密

跨机房ES同步实战
背景
众所周知,当单个机房出现不可抗拒的问题(如停电、断网等因素)时,会导致无法正常提供服务,给业务造成潜在损失。因此,在协同办公领域,一个基于同城或异地多活动机制的高可用设计,在保证数据一致性的同时,可以最大限度地减少机房单点可用带来的潜在高可用问题,最大限度地提升业务的用户体验,减少单点问题带来的潜在损失。

同城双岗对于生产的高可用性保障具有重要的意义和价值。表面上看,同城双活只是一套生产环境的简单部署,但在架构上,这种变化的影响是巨大的,比如无状态应用的高可用管理,请求流量的管理,版本发布的管理,网络架构的管理等。,其升级后的架构复杂度巨大。

结合真实的协同办公产品:驻京办(为北京市政府提供协同办公服务的综合平台)面临的复杂政务网络,以及驻京办同城双活架构演进的案例,介绍驻京办在不断完善和逐步演进过程中的一些思考和实践经验。本文仅介绍和总结es集群在跨机房同步过程中的方案和经验。

结构
1.在金山云机房部署Logstash,Logstash启动多个实例(按不同类型分类,提高同步效率),与金山云机房es集群在同一个VPC。

2.Logstash需要配置较大的网络访问权限,以保证Logstash和ES的原集群与目标集群相互通信。

3.数据迁移可以是完全迁移和增量迁移。第一次迁移是在完全迁移后添加数据的增量迁移。

4.增量迁移需要修改增量数据的标识,具体方法后面会介绍。

原则
Logstash的工作原理

Logstash分为三个部分:输入、过滤和输出:

1.input处理接收到的数据,这些数据可以来自es、日志文件、kafka等渠道。

2 .过滤器过滤并清理数据。

3.ouput将数据输出到目标设备,可以导出到ES、kafka、files等。

增量同步原理
对于时间T的数据,先用Logstash把T之前的数据全部迁移到福机房的京东云ES,假设需要时间T。
对于T到T+T的增量数据,再次使用logstash将数据导入到傅机房京东云的ES集群中。
重复上述步骤2,直到T足够小,然后将业务切换到华为云,最后完成新数据的迁移。
适用范围:ES数据有时间戳或其他可以区分旧数据和新数据的标签。

准备工作/即将开始工作
1.创建ECS并安装JDK。别理它,自己装。

2.下载相应版本的Logstash,尽量选择与Elasticsearch版本一致或接近的版本安装。
1)下载源代码,直接解压安装包,开箱即用。
迁移指数
Logstash会帮助用户自动创建索引,但是自动创建的索引和用户自己的索引会有一些差异,导致最终数据的搜索格式不一致。通常,需要手动创建索引,以确保索引中的数据完全一致。

提供了以下用于创建索引的python脚本。用户可以使用这个脚本来创建所需的索引。

文件create_mapping.py是用于同步索引的python脚本,config.yaml是集群地址配置文件。

注意:使用此脚本需要安装相关的依赖项。

yum install -y PyYAML
yum install-y python-请求
复制以下代码,并将其另存为create_mapping.py:

导入yaml
导入请求
导入json
导入getopt
导入系统

定义帮助():
打印
"""
用法:
-h/ -帮助打印此帮助。
-c/ - config配置文件路径,默认为config.yaml

示例:
python create _ mapping . py-c config . YAML
"""
定义进程映射(索引映射,目标索引):
打印(索引映射)

移除不必要的密钥

del index _ mapping[" settings "][" index "][" provided _ name "]
del index _ mapping[" settings "][" index "][" uuid "]
del index_mapping["设置"]["索引"]["创建日期"]
del index _ mapping[" settings "][" index "][" version "]

检查别名

别名=索引映射["别名"]
对于列表中的别名(aliases.keys()):
如果别名==目标索引:
打印(
源索引“+目标索引+”别名“+别名+”与目标索引名称相同,将删除此别名。)
del index_mapping["别名"][别名]
if index _ mapping[" settings "][" index "]。has_key("生命周期"):
生命周期=索引映射["设置"]["索引"]["生命周期"]
open distro = { " open distro ":{ " index _ state _ management ":
{"policy_id ":生命周期["name"],
" rollover_alias ":生命周期["rollover_alias"]}}
index_mapping["settings"]。更新(开放发行版)

index _ mapping[" settings "][" open distro "][" index _ state _ management "][" rollover _ alias "]= life cycle[" rollover _ alias "]

del index_mapping["设置"]["索引"]["生命周期"]
打印(索引映射)
返回索引_映射
def put_mapping_to_target(url,映射,source_index,dest_auth=None):
headers = { ' Content-Type ':' application/JSON ' }
create_resp = requests.put(url,headers=headers,data=json.dumps(mapping),auth=dest_auth)
if create_resp.status_code!= 200:
打印(
创建索引“+ url +”失败,响应为:“+ str(create_resp) +”,源索引为“+ source_index)
打印(创建响应文本)
with open(source_index +"。json”,“w”)作为f:
json.dump(映射,f)
def main():
config_yaml = "config.yaml "
opts,args = getopt . getopt(sys . argv[1:],'-h-c:',['help ',' config='])
对于opt_name,opt中的opt_value:
if opt_name in ('-h ','- help '):
帮助()
退出()
if opt_name in ('-c ','- config '):
config_yaml = opt_value

配置文件=打开(配置文件)
config = yaml.load(配置文件)
source = config["source"]
源用户=配置["源用户"]
源密码=配置["源密码"]
源身份验证=无
if source_user!= "":
源身份验证=(源用户,源密码)
dest = config["目的地"]
目标用户=配置["目标用户"]
dest_passwd = config["目的地_passwd"]
dest_auth =无
if dest_user!= "":
dest_auth =(目标用户,目标密码)
打印(来源身份验证)
打印(目的地验证)

只处理映射列表

if配置["only_mapping"]:
对于source_index,配置[“映射”]中的dest_index。iteritems():
print("开始处理源索引"+源索引+",目标索引:"+目标索引)
源url =源+ "/" +源索引
response = requests.get(源url,auth =源身份验证)
if response.status_code!= 200:
打印(" ***获取ElasticSearch消息失败。resp statusCode:" + str(
response.status_code) +"响应是"+ response.text)
继续
mapping = response.json()
index_mapping = process_mapping(映射[源索引],目标索引)

目标url =目标+ "/" +目标索引
put_mapping_to_target(目标url,索引映射,源索引,目标验证)
print(" process source index "+source _ index+" to target index "+dest _ index+"成功。")
否则:

获取所有索引

response = requests . get(source+"/_ alias ",auth=source_auth)
if response.status_code!= 200:
打印(" ***获取所有索引失败。resp statusCode:" + str(
response.status_code) +"响应是"+ response.text)
退出()
all_index = response.json()
对于列表中的索引(all_index.keys()):
如果“.”在索引中:
继续
print("开始处理源索引"+索引)
source_url = source + "/" + index
index_response = requests.get(源url,auth =源身份验证)
if index_response.status_code!= 200:
打印(" ***获取ElasticSearch消息失败。resp statusCode:" + str(
index_response.status_code) +"响应是"+ index_response.text)
继续
mapping = index_response.json()

dest_index =索引
如果索引在配置[“映射”]。密钥():
dest _ index = config[" mapping "][index]
index_mapping = process_mapping(映射[索引],目标索引)

目标url =目标+ "/" +目标索引
put_mapping_to_target(目标url,索引映射,索引,目标验证)
print("处理源索引"+ index +"到目标索引"+ dest_index +"成功。")

if name == 'main ':
主()
将配置文件另存为config.yaml:

源ES群集地址,加上http://

来源:http://ip:port
源用户:“用户名”
source_passwd:"密码"

目标ES集群地址,加上http://

目的地:http://ip:port
目的地用户:“用户名”
destination_passwd:"密码"

您想只处理此文件中映射地址的索引吗?

如果设置为true,则在目标上将只获取和创建以下映射中的索引。

如果设置为false,将采用源群集的所有索引,除了(。基巴纳)

如果使用映射值作为目标匹配索引名称,则使用以下映射匹配索引名称。

如果不匹配,则使用源的原始索引名。

仅_映射:真

要迁移的索引,key是源的索引名称,value是目标的索引名称。

映射:
源索引:目标索引
上面的代码和配置文件都准备好了,直接执行python create_mapping.py就可以完成索引同步了

当索引被同步时,您可以在目标集群的kibana上查看它,或者执行curl来查看索引迁移。

至于京办从金山云机房迁移到JD.COM富友机房,涉及的业务领域较多,各业务线代表的新增记录时间戳字段不统一,涉及大量兼容工作量。因此,考虑通过elasticsearch中的预处理函数管道增加一个统一的增量标记字段:gmt_created_at,以降低迁移的复杂度(各业务线可以评估这一步是否有必要)。

结合业务特点,通过在过滤器中加入少量的ruby代码,将_routing的值取出放回logstah事件中,从而解决了问题。