时间会让我们更好。   

MySQL同步数据至ElasticSearch

    上篇文章介绍了搭建ElasticSearch环境,有了环境,我们还差啥?对的,数据。我们的数据一般存储在关系型数据Oracle或者MySQL里面,这里介绍一种好用的同步工具- Logstash

一.介绍

什么是Logstash

Logstash是一款轻量级的日志搜集处理框架,可以方便的把分散的、多样化的日志搜集起来,并进行自定义的处理,然后传输到指定的位置,比如某个服务器或者文件。


二.安装和测试

1.下载

我测试的版本是5.6.8,贴出官网下载的地址:Logstash 5.6.8

2.解压测试

下载好后,解压进入bin目录,输入下面一行命令测试:

./logstash -e 'input { stdin { } } output { stdout {} }'

如果没有敲错,正确的结果会是:

[xiaotang@hecg-pc bin]$ ./logstash -e 'input { stdin { } } output { stdout {} }'
Sending Logstash's logs to /data/tools/logstash-5.6.8/logs which is now configured via log4j2.properties
[2019-04-16T14:12:04,523][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"fb_apache", :directory=>"/data/tools/logstash-5.6.8/modules/fb_apache/configuration"}
[2019-04-16T14:12:04,528][INFO ][logstash.modules.scaffold] Initializing module {:module_name=>"netflow", :directory=>"/data/tools/logstash-5.6.8/modules/netflow/configuration"}
[2019-04-16T14:12:04,766][INFO ][logstash.pipeline        ] Starting pipeline {"id"=>"main", "pipeline.workers"=>4, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>5, "pipeline.max_inflight"=>500}
[2019-04-16T14:12:04,870][INFO ][logstash.pipeline        ] Pipeline main started
The stdin plugin is now waiting for input:
[2019-04-16T14:12:04,935][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
aaaa
2019-04-16T06:12:12.087Z hecg-pc aaaa
  • stdin,表示输入流,指从键盘输入
  • stdout,表示输出流,指从显示器输出
  • 命令行参数: -e 执行 -f 配置文件,后跟参数类型可以是一个字符串的配置或全路径文件名或全路径

三.项目使用示例

1.创建配置文件

创建一个配置文件:mysql_car.conf,内容如下:

input {
    jdbc {
        # mysql jdbc connection string to our backup databse
        jdbc_connection_string =>
        "jdbc:mysql://192.168.88.199:3306/car-mysql?characterEncoding=UTF8"
        # the user we wish to excute our statement as
        jdbc_user => "root"
        jdbc_password => "root"
        # the path to our downloaded jdbc driver
        jdbc_driver_library => "/data/tools/drivers/mysql-connector-java-5.1.40.jar"
        # the name of the driver class for mysql
        jdbc_driver_class => "com.mysql.jdbc.Driver"
        jdbc_paging_enabled => "true"
        jdbc_page_size => "50000"
        #以下对应着要执行的sql的绝对路径。
        statement => "select id,name,isenabled,isremoved from car_style"
        #定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为 每分钟都更新
        schedule => "* * * * *"
    }
}
output {
    elasticsearch {
        #ESIP地址与端口
        hosts => "localhost:9200"
        #ES索引名称(自己定义的)
        index => "ycstyle"
        #自增ID编号
        document_id => "%{id}"
        document_type => "article"
    }
    stdout {
        #以JSON格式输出
        codec => json_lines
    }
}

2.指定配置文件启动

./logstash -f /data/tools/logstash-5.6.8/mysqletc/mysql_car.conf

再次刷新elasticsearch-head的数据显示,看是否也更新了数据。