博客
关于我
logstash mysql 准实时同步到 elasticsearch
阅读量:792 次
发布时间:2023-02-06

本文共 3234 字,大约阅读时间需要 10 分钟。

MySQL 到 Elasticsearch 数据同步:使用 Logstash 的完整配置指南

MySQL 作为一款成熟稳定的数据持久化解决方案,在多个领域中得到了广泛应用。然而,在数据分析方面,MySQL 的表现稍显不足。为了弥补这一不足,Elasticsearch 作为数据分析领域的领军者,提供了一种强大的数据处理和搜索功能。通过将 MySQL 的数据实时同步到 Elasticsearch 中,我们可以充分发挥两者的优势。本文将详细介绍如何使用 Logstash 实现 MySQL 到 Elasticsearch 的数据同步。


Logstash 获取与安装

首先,我们需要下载并安装 Logstash。以下是获取 Logstash 的步骤:

wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.3.zipunzip logstash-6.2.3.zipcd logstash-6.2.3

安装完成后,我们需要安装必要的插件:

bin/logstash-plugin install logstash-input-jdbcbin/logstash-plugin install logstash-output-elasticsearch

为了确保 Logstash 与 MySQL 的连接,我们需要下载适用于 JDBC 的 MySQL 驱动:

wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.46.zipunzip mysql-connector-java-5.1.46.zip

配置文件编写

接下来,我们需要编写 Logstash 的配置文件。配置文件将包含以下两部分:输入插件(logstash-input-jdbc)和输出插件(logstash-output-elasticsearch)。


1. 输入插件(logstash-input-jdbc

输入插件用于从 MySQL 数据库中读取数据。以下是典型的配置示例:

input {  jdbc {    jdbc_driver_library => "../mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar"    jdbc_driver_class => "com.mysql.jdbc.Driver"    jdbc_connection_string => "jdbc:mysql://
:3306/rta" jdbc_user => "
" jdbc_password => "
" schedule => "* * * * *" statement => "SELECT * FROM table WHERE update_time >= :sql_last_value" use_column_value => true tracking_column_type => "timestamp" tracking_column => "update_time" last_run_metadata_path => "syncpoint_table" }}

参数说明:

  • jdbc_driver_library:MySQL JDBC 驱动的路径。
  • jdbc_driver_class:驱动类的全名,填写 com.mysql.jdbc.Driver
  • jdbc_connection_string:MySQL 连接地址,格式为 jdbc:mysql://<host>:<port>/<database>
  • jdbc_userjdbc_password:MySQL 用户名和密码。
  • schedule:定义数据同步的时间调度,格式类似 crontab。
  • statement:执行的 SQL 语句。sql_last_value 是一个内置变量,表示上一次同步时的 update_time 值。
  • use_column_value:启用增量同步,使用指定的列来跟踪同步进度。
  • tracking_column_type:递增字段的类型,支持 numerictimestamp
  • tracking_column:递增字段的名称。
  • last_run_metadata_path:记录上一次同步的文件路径。

注意事项:

  • 如果表中没有递增字段,建议添加一个 update_time 字段,类型为 timestamp
  • 递增字段可以是 id(主键)或 update_time,前者适用于只插入数据的表,后者更通用。

2. 输出插件(logstash-output-elasticsearch

输出插件用于将数据发送到 Elasticsearch。以下是典型的配置示例:

output {  elasticsearch {    hosts => ["172.31.22.165", "172.31.17.241", "172.31.30.84", "172.31.18.178"]    user => "
" password => "
" index => "table" document_id => "%{id}" }}

参数说明:

  • hosts:Elasticsearch 集群地址,支持多个地址。
  • userpassword:Elasticsearch 用户名和密码。
  • index:数据将被导入的 Elasticsearch 索引名称。
  • document_id:文档 ID。建议将其设置为 MySQL 表中的主键字段 %{id},以避免数据重复。

运行步骤

将以上配置文件保存为 sync_table.cfg,然后执行以下命令启动 Logstash:

cd logstash-6.2.3bin/logstash -f config/sync_table.cfg

其他问题

1. 多表同步

如果需要同步多个表,可以在不同的配置文件中分别配置每个表的同步规则。例如:

pipelines.yml: - pipeline.id: table1   path.config: "config/sync_table1.cfg" - pipeline.id: table2   path.config: "config/sync_table2.cfg"

启动 Logstash 时,执行以下命令:

bin/logstash

2. @timestamp 字段

默认情况下,Logstash 会为每条记录添加 @timestamp 字段,值为当前时间戳。为了使用数据中的某个字段作为 @timestamp,可以使用 filter.date 插件。例如:

filter {  date {    match => [ "timeslice", "yyyyMMddHHmm" ]    timezone => "Asia/Shanghai"  }}

将此配置添加到 sync_table.cfg 文件中,可以确保 @timestamptimeslice 字段保持一致。


参考链接


通过以上配置,您可以轻松实现 MySQL 数据到 Elasticsearch 的实时同步,充分发挥两者结合的优势。

转载地址:http://hcufk.baihongyu.com/

你可能感兴趣的文章
LR参数化
查看>>
LR测试文件上传
查看>>
ls 执行脚本关联
查看>>
LSTM-在一段时间后预测相同的常量值
查看>>
LSTM介绍-ChatGPT4o作答
查看>>
LSTM入门学习——本质上就是比RNN的隐藏层公式稍微复杂了一点点而已
查看>>
LSTM错误:AttributeError:‘;tuple‘;对象没有属性‘;dim‘;
查看>>
lsusb
查看>>
ls实现排序
查看>>
LS(链路状态)算法及matlab仿真
查看>>
ltrace命令详解
查看>>
lua coroutine
查看>>
Lua JIT
查看>>
Lua 中写 C 扩展库时用到的一些技巧
查看>>
Lua 中的元表(Metatable)在实际开发中的高级应用场景有哪些?
查看>>
lua 协程
查看>>
Lua 基础语法与代码编写规范
查看>>
Lua 开发环境搭建
查看>>
Lua 的协程在并发编程中有哪些独特的应用场景和优势?
查看>>
Lua 笔记-lua的与众不同处
查看>>