本文共 3234 字,大约阅读时间需要 10 分钟。
MySQL 作为一款成熟稳定的数据持久化解决方案,在多个领域中得到了广泛应用。然而,在数据分析方面,MySQL 的表现稍显不足。为了弥补这一不足,Elasticsearch 作为数据分析领域的领军者,提供了一种强大的数据处理和搜索功能。通过将 MySQL 的数据实时同步到 Elasticsearch 中,我们可以充分发挥两者的优势。本文将详细介绍如何使用 Logstash 实现 MySQL 到 Elasticsearch 的数据同步。
首先,我们需要下载并安装 Logstash。以下是获取 Logstash 的步骤:
wget https://artifacts.elastic.co/downloads/logstash/logstash-6.2.3.zipunzip logstash-6.2.3.zipcd logstash-6.2.3
安装完成后,我们需要安装必要的插件:
bin/logstash-plugin install logstash-input-jdbcbin/logstash-plugin install logstash-output-elasticsearch
为了确保 Logstash 与 MySQL 的连接,我们需要下载适用于 JDBC 的 MySQL 驱动:
wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.46.zipunzip mysql-connector-java-5.1.46.zip
接下来,我们需要编写 Logstash 的配置文件。配置文件将包含以下两部分:输入插件(logstash-input-jdbc
)和输出插件(logstash-output-elasticsearch
)。
logstash-input-jdbc
)输入插件用于从 MySQL 数据库中读取数据。以下是典型的配置示例:
input { jdbc { jdbc_driver_library => "../mysql-connector-java-5.1.46/mysql-connector-java-5.1.46-bin.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://:3306/rta" jdbc_user => " " jdbc_password => " " schedule => "* * * * *" statement => "SELECT * FROM table WHERE update_time >= :sql_last_value" use_column_value => true tracking_column_type => "timestamp" tracking_column => "update_time" last_run_metadata_path => "syncpoint_table" }}
jdbc_driver_library
:MySQL JDBC 驱动的路径。jdbc_driver_class
:驱动类的全名,填写 com.mysql.jdbc.Driver
。jdbc_connection_string
:MySQL 连接地址,格式为 jdbc:mysql://<host>:<port>/<database>
。jdbc_user
和 jdbc_password
:MySQL 用户名和密码。schedule
:定义数据同步的时间调度,格式类似 crontab。statement
:执行的 SQL 语句。sql_last_value
是一个内置变量,表示上一次同步时的 update_time
值。use_column_value
:启用增量同步,使用指定的列来跟踪同步进度。tracking_column_type
:递增字段的类型,支持 numeric
和 timestamp
。tracking_column
:递增字段的名称。last_run_metadata_path
:记录上一次同步的文件路径。update_time
字段,类型为 timestamp
。id
(主键)或 update_time
,前者适用于只插入数据的表,后者更通用。logstash-output-elasticsearch
)输出插件用于将数据发送到 Elasticsearch。以下是典型的配置示例:
output { elasticsearch { hosts => ["172.31.22.165", "172.31.17.241", "172.31.30.84", "172.31.18.178"] user => "" password => " " index => "table" document_id => "%{id}" }}
hosts
:Elasticsearch 集群地址,支持多个地址。user
和 password
:Elasticsearch 用户名和密码。index
:数据将被导入的 Elasticsearch 索引名称。document_id
:文档 ID。建议将其设置为 MySQL 表中的主键字段 %{id}
,以避免数据重复。将以上配置文件保存为 sync_table.cfg
,然后执行以下命令启动 Logstash:
cd logstash-6.2.3bin/logstash -f config/sync_table.cfg
如果需要同步多个表,可以在不同的配置文件中分别配置每个表的同步规则。例如:
pipelines.yml: - pipeline.id: table1 path.config: "config/sync_table1.cfg" - pipeline.id: table2 path.config: "config/sync_table2.cfg"
启动 Logstash 时,执行以下命令:
bin/logstash
@timestamp
字段默认情况下,Logstash 会为每条记录添加 @timestamp
字段,值为当前时间戳。为了使用数据中的某个字段作为 @timestamp
,可以使用 filter.date
插件。例如:
filter { date { match => [ "timeslice", "yyyyMMddHHmm" ] timezone => "Asia/Shanghai" }}
将此配置添加到 sync_table.cfg
文件中,可以确保 @timestamp
与 timeslice
字段保持一致。
通过以上配置,您可以轻松实现 MySQL 数据到 Elasticsearch 的实时同步,充分发挥两者结合的优势。
转载地址:http://hcufk.baihongyu.com/