+-
Kafka Connect 如何使命名空间与数据库名称不可知?

我的环境

MySQL(5.7): 我们有多个模式,命名惯例是{application_name}_env。

例子:我们有两个应用程序:app1和app2。考虑我们有两个应用程序:app1和app2。

开发环境。 数据库名称为app1_dev, app2_dev。

QA环境。 数据库名称为app1_qa, app2_qa.

Debezium(0.8.3). 该插件用于CDC MySQL日志。

连接器配置为:。

{
"name": "connector-1",
"config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",
    "database.hostname": "mysql",
    "database.port": "3306",
    "database.user": "debezium",
    "database.password": "dbz",
    "database.server.id": "184054",
    "database.server.name": "dbserver1",
    "database.history.kafka.bootstrap.servers": "kafka:9092",
    "database.history.kafka.topic": "schema-changes.inventory",
    "decimal.handling.mode": "double",
    "snapshot.mode": "when_needed",
    "table.whitelist":"{database_name}.account",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "transforms" : "setSchema",
    "transforms.setSchema.type" : "org.apache.kafka.connect.transforms.SetSchemaMetadata$Value",
    "transforms.setSchema.schema.name" : "com.test.Account"
}

}

春天的Java应用 我使用Kafka Consumer(@KafkaListener)来读取Debezium事件的变化。

我提供了avsc文件,并使用gradle avro插件来生成类。

开发环境的模式

{
"type":"record",
"name":"Accounts",
"namespace":"com.test",
"fields":[
  {
     "name":"before",
     "type":[
        "null",
        {
           "type":"record",
           "name":"Value",
           "namespace":"dbserver1.app1_dev.account",
           "fields":[
              {
                 "name":"id",
                 "type":"long"
              }
           ],
           "connect.name":"dbserver1.app1_dev.account.Value"
        }
     ],
     "default":null
  },
  {
     "name":"after",
     "type":[
        "null",
        "dbserver1.app1_dev.account.Value"
     ],
     "default":null
  },
  {
     "name":"source",
     "type":{
        "type":"record",
        "name":"Source",
        "namespace":"io.debezium.connector.mysql",
        "fields":[
           {
              "name":"version",
              "type":[
                 "null",
                 "string"
              ],
              "default":null
           },
           {
              "name":"name",
              "type":"string"
           },
           {
              "name":"server_id",
              "type":"long"
           },
           {
              "name":"ts_sec",
              "type":"long"
           },
           {
              "name":"gtid",
              "type":[
                 "null",
                 "string"
              ],
              "default":null
           },
           {
              "name":"file",
              "type":"string"
           },
           {
              "name":"pos",
              "type":"long"
           },
           {
              "name":"row",
              "type":"int"
           },
           {
              "name":"snapshot",
              "type":[
                 {
                    "type":"boolean",
                    "connect.default":false
                 },
                 "null"
              ],
              "default":false
           },
           {
              "name":"thread",
              "type":[
                 "null",
                 "long"
              ],
              "default":null
           },
           {
              "name":"db",
              "type":[
                 "null",
                 "string"
              ],
              "default":null
           },
           {
              "name":"table",
              "type":[
                 "null",
                 "string"
              ],
              "default":null
           },
           {
              "name":"query",
              "type":[
                 "null",
                 "string"
              ],
              "default":null
           }
        ],
        "connect.name":"io.debezium.connector.mysql.Source"
     }
   },
   {
     "name":"op",
     "type":"string"
   },
   {
     "name":"ts_ms",
     "type":[
        "null",
        "long"
     ],
     "default":null
   }
 ],
 "connect.name":"com.test.Account"
 }

问题:由于我的数据库模式是动态的,即它们以env后缀结束。由于我的数据库模式是动态的,即它们以env后缀结束。

在每个环境下生成的Schema有不同的命名空间。

开发:dev.app1_dev.accountsQA:dev.app1_qa.accounts。

由于命名空间的不同,我无法在QA中反序列化我的开发代码。所以,如果使用了在Dev中生成的模式,代码在QA中就无法工作。

我想确保命名空间在所有环境中是一致的。

0
投票

请使用 org.apache.kafka.connect.transforms.SetSchemaMetadata SMT - 见 https:/github.coma0x8okafkablobmasterconnecttransformssrcmainjavaorgapachekafkaconnecttransformsSetSchemaMetadata.java。