Elasticsearch 基于logstash 同步MySQL8 数据解析

熊孩纸 阅读:266 2021-03-31 13:47:59 评论:0

概述

  在生成业务常有将MySQL数据同步到ES的需求,如果需要很高的定制化,往往需要开发同步程序用于处理数据。但没有特殊业务需求,官方提供的logstash就很有优势了。
  在使用logstash我们应先了解其特性,再决定是否使用:

  • 无需开发,仅需安装配置logstash即可;
  • 凡是SQL可以实现的logstash均可以实现(本就是通过sql查询数据)
  • 支持每次全量同步或按照特定字段(如递增ID、修改时间)增量同步;
  • 同步频率可控,最快同步频率每分钟一次(如果对实效性要求较高,慎用);
  • 不支持被物理删除的数据同步物理删除ES中的数据(可在表设计中增加逻辑删除字段IsDelete标识数据删除)。

1、安装

  前往官网下载logstash,下载地址www.elastic.co/downloads/l…,zip压缩包大约160M(觉得官网下载慢的可前往@zxiaofan的CSDN下载);
  程序目录:【windows】C:\logstash-6.3.2;

2、配置

2.1、新建目录存放配置文件及mysql依赖包

  在【windows】目录(\bin同级)新建elasticsearch目录,将下载好的mysql-connector-java-8.0.11.jar放入此目录;
  在【windows】\elasticsearch目录新建elasticsearch.conf文件,此文件将配置数据库连接信息、查询数据sql、分页信息、同步频率等核心信息。
  注意事项请查看注释信息。

2.2、单表同步配置

input { 
  stdin { 
  } 
  jdbc { 
  jdbc_connection_string => "jdbc:mysql://192.168.0.1:3306/***?serverTimezone=Asia/Shanghai&useSSL=false&allowPublicKeyRetrieval=true&allowMultiQueries=true" 
  jdbc_user => "root" 
  jdbc_password => "123456" 
  jdbc_driver_library => "../elasticsearch/mysql-connector-java-8.0.11.jar" 
  jdbc_driver_class => "com.mysql.cj.jdbc.Driver" 
  statement => "select  
							tb.sid as sid, 
							tb.unit_proj_sid, 
							tb.individual_proj_sid, 
							tb.eng_proj_sid, 
                            (select CONCAT(organization_code, '|', organization_name) from ucas_auth_organization where ucas_auth_organization.organization_code =  tb.organization_code) as organization_code_name, 
                            (select CONCAT(assort_num, '|', assort_name) from sys_assort_bus where sys_assort_bus.assort_num =  tb.assort_num) as assort_num_name, 
                            tb.organization_code as organization_code, 
                            tb.assort_num as assort_num, 
							tb.arch_id, 
							tb.unit_proj_arch_no, 
							tb.reg_no, 
							tb.arch_title, 
							tb.make_org_name, 
							(select sys_data_dict.value_ from sys_data_dict where sys_data_dict.key_ =  tb.arch_type_code and sys_data_dict.category_code ='ARCH_TYPE_CODE') as arch_type_code, 
							tb.spec_code, 
							tb.start_date, 
							tb.end_date, 
						    (select sys_data_dict.value_ from sys_data_dict where sys_data_dict.key_ =  tb.security_level_code and sys_data_dict.category_code ='SECURITY_LEVEL_CODE') as security_level_code, 
							(select sys_data_dict.value_ from sys_data_dict where sys_data_dict.key_ =  tb.storage_type_code and sys_data_dict.category_code ='STORAGE_TYPE_CODE') as storage_type_code, 
							'案卷信息' AS arch_type_name, 
							tb.arch_type, 
							tb.arch_category_code, 
							tb.archiving_date, 
							DATE_FORMAT(tb.archiving_date, '%Y') as archiving_date_year 
					from ucas_arch_info tb where tb.arch_type = 2" 
  } 
} 
output { 
  elasticsearch { 
    hosts => "192.168.1.105:9200" 
    index => "blog" 
	document_type =>"bookInfo" 
    document_id => "%{sid}" 
  } 
  stdout { 
    codec => json_lines 
  } 
}

3、启动运行

  在【windows】目录执行以下命令启动:

【windows】 
C:\logstash-6.3.2\logstash-6.3.2\bin>logstash.bat -f  
     ../elasticsearch/elasticsearch.conf

 在【windows】\logs目录会有运行日志

Note:
  5.x/6.X/7.x版本需要jdk8支持,如果默认jdk版本不是jdk8,那么需要在logstash或logstash.lib.sh的行首位置添加两个环境变量:

export JAVA_CMD="/usr/tools/jdk1.8.0_162/bin" 
export JAVA_HOME="/usr/tools/jdk1.8.0_162/"

 

 

 

声明

1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,请转载时务必注明文章作者和来源,不尊重原创的行为我们将追究责任;3.作者投稿可能会经我们编辑修改或补充。

发表评论
搜索
排行榜
KIKK导航

KIKK导航

关注我们