Mysqlからlogstashを使ってElasticsearchに同期させるをやってみた

MysqlなどのRDBからlogstashを使ってElasticsearchに同期させるというのは既に様々な方が実践されているようだが、自分はやってみたことなかったのでやってみました。 検証の動機としては、今まではRDBに入った値をElasticsearchに同期させるために自前で定期的にElasticsearchにドキュメントを追加または更新する処理を書いていたが、それをしなくも良いとなると随分楽になるので、実際どういう風に書けるのか知りたかったからです。

環境

自分が今回試した環境は以下です。

  • Elasticsearch: 7.10.1
  • Mysql: 8.0
  • logstash: 7.10.1

設定

Mysql

今回同期させたいテーブルは以下のような構成です。

CREATE TABLE `tickets` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `title` varchar(80) COLLATE utf8mb4_general_ci NOT NULL,
  `description` varchar(1024) COLLATE utf8mb4_general_ci NOT NULL,
  `point` int DEFAULT '0',
  `user_id` bigint DEFAULT NULL,
  `created_at` datetime(6) NOT NULL,
  `updated_at` datetime(6) NOT NULL,
  PRIMARY KEY (`id`),
  KEY `index_tickets_on_user_id` (`user_id`)
) ENGINE=InnoDB AUTO_INCREMENT=10 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

Elasticsearch

Elaticsearchのマッピングは以下のような定義にしています。

$ curl -XGET 'localhost:9200/es_tickets/_mapping?pretty'

{
  "es_tickets" : {
    "mappings" : {
      "properties" : {
        "@timestamp" : {
          "type" : "date"
        },
        "@version" : {
          "type" : "text",
          "fields" : {
            "keyword" : {
              "type" : "keyword",
              "ignore_above" : 256
            }
          }
        },
        "created_at" : {
          "type" : "date"
        },
        "creator_name" : {
          "type" : "text",
          "fields" : {
            "raw" : {
              "type" : "keyword"
            }
          },
          "analyzer" : "kuromoji_analyzer"
        },
        "description" : {
          "type" : "text",
          "analyzer" : "kuromoji_analyzer"
        },
        "description2" : {
          "type" : "text",
          "analyzer" : "ngram_analyzer"
        },
        "id" : {
          "type" : "long"
        },
        "point" : {
          "type" : "long"
        },
        "title" : {
          "type" : "text",
          "fields" : {
            "raw" : {
              "type" : "keyword"
            }
          },
          "analyzer" : "kuromoji_analyzer"
        },
        "title2" : {
          "type" : "text",
          "analyzer" : "ngram_analyzer"
        },
        "unix_ts_in_secs" : {
          "type" : "float"
        },
        "updated_at" : {
          "type" : "date"
        },
        "user_id" : {
          "type" : "long"
        }
      }
    }
  }
}

今回はdescriptionとtitleで二つのanalyzerを定義しているmappingで、このindexにdocumentを同期させるようにlogstashを設定したいと思いました。

logstash

logstashの定義です。

Dockerfileを以下のように書きました。

FROM docker.elastic.co/logstash/logstash:7.17.1
RUN rm -rf logstash/config
RUN rm -rf logstash/pipeline
COPY config /user/share/logstash/config
COPY pipeline /user/share/logstash/pipeline
COPY --chmod=755 mysql-connector-java-8.0.17.jar /user/share/logstash/mysql-connector-java-8.0.17.jar

permissionを指定しないと、permissionのエラーが出てdriverを読み込めなかったので指定しています。 続いてpipeline直下に配置したjdbcドライバの設定をfilterの設定を記述したconfファイルです。

input {
  jdbc {
    jdbc_driver_library => "/user/share/logstash/mysql-connector-java-8.0.17.jar"
    jdbc_driver_class => "com.mysql.cj.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://db:3306/hogehoge_db"
    jdbc_default_timezone => "Asia/Tokyo"
    jdbc_user => "root"
    jdbc_password => ""
    jdbc_default_timezone => "Asia/Tokyo"
    tracking_column => 'id'
    tracking_column_type => "numeric"
    use_column_value => true
    schedule => "* * * * *"
    statement => "SELECT id, title, description, point, user_id, created_at, updated_at FROM tickets where id > :sql_last_value order by id asc"
  }
}

filter {
    mutate {
        add_field => { "description2" => "description" }
        add_field =>  { "title2" => "title" } 
    }
}

output {
  elasticsearch {
    hosts => ["elasticsearch"]
    index => "es_tickets"
    document_id => "%{id}"
  }
}

inputのstatementのところでmysqlから同期するときに、どの条件のデータを取得しElaticsearchに入れるのかsqlで指定します。「:sql_last_value」で最後に取得したレコード以降を取得すること条件で指定しています。今回はidカラムを指定しているので、tracking_columnでidを指定し、use_column_valueでidカラムのようにテーブルで定義したカラムををsql_last_valueにセットするように指定します。またscheduleで同期タイミングを指定できます。今回の場合は検証のため「 * * * * *」を指定しています。

filterでは今回ElasticsearchのマッピングMysqlのテーブルカラムと完全には一致しておらず追加分があるためmutateのadd_fieldで追加した分の指定をしています。mutateではfieldの追加や変更、削除ができます。

outputでは今回データの同期先がElasticsearchなので、Elasticsearchのhostsの指定、document_idにセットするカラムの値やIndexの指定をしています。 参照

続いてconfigの下にある設定ファイルです。

pipeline.ordered: auto

http.host: "0.0.0.0"
xpack.monitoring.elasticsearch.hosts: ["http://elasticsearch:9200"]
- pipeline.id: tickets
  path.config: "/usr/share/logstash/pipeline/tickets_jdbc.conf"
  queue.type: persisted

動作確認

設定は以上です。これでMysql、Elasticsearch、logstashを全て立ち上げてエラーが出ていなければ、設定に問題ないことは確認できます。 Mysqlに以下のようにデータを入れるとちゃんとlogstashに同期されるのが確認できると思います

f:id:nakaearth:20220327080246p:plain
mysql

感想

自前で同期させるよりも、このようなミドルウェアの組み合わせで自動で出来る様にするのは筋が良いと感じました。 条件が合えば、この方式を採用するのもありだなと思います。