IBM MQ에서 Kafka, MongoDB로 XML 메시지 스트리밍

IBM MQ의 대기열에 XML 데이터가 있고 이를 Kafka로 수집하여 애플리케이션에서 다운스트림을 사용하거나 MongoDB와 같은 NoSQL 저장소로 스트리밍하려고 한다고 가정해 보겠습니다.

Note

This same pattern for ingesting XML will work with other connectors such as JMS and ActiveMQ.





다음을 포함하는 Docker Compose 스택이 실행 중입니다.
  • IBM MQ
  • Apache Kafka(가장 중요한 스키마 레지스트리를 포함하기 위해 Confluent 플랫폼으로 배포됨)
  • 몽고디비

  • 일부 테스트 데이터를 IBM MQ에 로드



    XML 파일에서 큐로 일부 메시지를 로드해 보겠습니다.

    docker exec --interactive ibmmq \
      /opt/mqm/samp/bin/amqsput DEV.QUEUE.1 QM1 < data/note.xml
    



    IBM MQ에서 Kafka로 스트리밍 및 XML 메시지 변환



    이제 Kafka Connect 플러그인 및 IbmMQSourceConnector과 함께 XML Transformation을 사용하여 이를 Kafka로 수집할 수 있습니다.

    curl -i -X PUT -H  "Content-Type:application/json" \
        http://localhost:8083/connectors/source-ibmmq-note-01/config \
        -d '{
        "connector.class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector",
        "kafka.topic":"ibmmq-note-01",
        "mq.hostname":"ibmmq",
        "mq.port":"1414",
        "mq.queue.manager":"QM1",
        "mq.transport.type":"client",
        "mq.channel":"DEV.APP.SVRCONN",
        "mq.username":"app",
        "mq.password":"password123",
        "jms.destination.name":"DEV.QUEUE.1",
        "jms.destination.type":"queue",
        "confluent.license":"",
        "confluent.topic.bootstrap.servers":"broker:29092",
        "confluent.topic.replication.factor":"1",
        "transforms": "extractPayload,xml",
        "transforms.extractPayload.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
        "transforms.extractPayload.field": "text",
        "transforms.xml.type": "com.github.jcustenborder.kafka.connect.transform.xml.FromXml$Value",
        "transforms.xml.schema.path": "file:///data/note.xsd",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url":"http://schema-registry:8081"
        }'
    

    Note

    ExtractField is needed otherwise the XML transform will fail with java.lang.UnsupportedOperationException: STRUCT is not a supported type. since it will be trying to operate on the entire payload from IBM MQ which includes fields other than the XML that we’re interested in.



    결과 Kafka 주제는 Avro에서 직렬화된 메시지의 text 필드 값을 보유합니다.

    docker exec kafkacat \
        kafkacat                            \
          -b broker:29092                   \
          -r http://schema-registry:8081    \
          -s key=s -s value=avro            \
          -t ibmmq-note-01                  \
          -C -o beginning -u -q -J | \
        jq -c '.payload'
    



    {"Note":{"to":"Tove","from":"Jani","heading":"Reminder 01","body":"Don't forget me this weekend!"}}
    {"Note":{"to":"Jani","from":"Tove","heading":"Reminder 02","body":"Of course I won't!"}}
    
    

    XML 데이터를 Kafka로 가져오는 것과 관련된 개념에 대한 자세한 내용을 이해하기 위해 Kafka Connect 및 XML 변환의 세부 사항에 대해 작성했습니다.

    Kafka에서 MongoDB로 데이터 스트리밍



    그런 다음 official plugin for Kafka Connect from MongoDB 을 사용하여 다른 Kafka Connect 커넥터를 파이프라인에 추가할 수 있습니다. 그러면 Kafka 항목에서 MongoDB로 데이터가 바로 스트리밍됩니다.

    curl -i -X PUT -H  "Content-Type:application/json" \
        http://localhost:8083/connectors/sink-mongodb-note-01/config \
        -d '{
        "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
        "topics":"ibmmq-note-01",
        "connection.uri":"mongodb://mongodb:27017",
        "database":"rmoff",
        "collection":"notes",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url":"http://schema-registry:8081"
        }'
    

    MongoDB에서 데이터를 확인하십시오.

    docker exec --interactive mongodb mongo localhost:27017 <<EOF
    use rmoff
    db.notes.find()
    EOF
    



    MongoDB shell version v4.4.1
    connecting to: mongodb://localhost:27017/test?compressors=disabled&gssapiServiceName=mongodb
    Implicit session: session { "id" : UUID("9aae83c4-0e25-43a9-aca5-7278d366423b") }
    MongoDB server version: 4.4.1
    switched to db rmoff
    { "_id" : ObjectId("5f77b64eee00df1cc80135a1"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 01", "body" : "Don't forget me this weekend!" }
    { "_id" : ObjectId("5f77b64eee00df1cc80135a2"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 02", "body" : "Of course I won't!" }
    { "_id" : ObjectId("5f77b64eee00df1cc80135a3"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 03", "body" : "Where are you?" }
    { "_id" : ObjectId("5f77b64eee00df1cc80135a4"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 04", "body" : "I forgot 🤷‍♂️" }
    bye
    

    다른 레코드를 MQ로 전송하여 이것이 실제로 스트리밍되는지 확인합니다.

    echo "<note> <to>Tove</to> <from>Jani</from> <heading>Reminder 05</heading> <body>Srsly?</body> </note>" | docker exec --interactive ibmmq /opt/mqm/samp/bin/amqsput DEV.QUEUE.1 QM1
    
    Sample AMQSPUT0 start
    target queue is DEV.QUEUE.1
    Sample AMQSPUT0 end
    

    그리고 MongoDB의 새로운 레코드를 보십시오.

    docker exec --interactive mongodb mongo localhost:27017 <<EOF
    use rmoff
    db.notes.find()
    EOF
    



    MongoDB shell version v4.4.1
    connecting to: mongodb://localhost:27017/test?compressors=disabled&gssapiServiceName=mongodb
    Implicit session: session { "id" : UUID("2641e93e-9c5d-4270-8f64-e52295a60309") }
    MongoDB server version: 4.4.1
    switched to db rmoff
    { "_id" : ObjectId("5f77b64eee00df1cc80135a1"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 01", "body" : "Don't forget me this weekend!" }
    { "_id" : ObjectId("5f77b64eee00df1cc80135a2"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 02", "body" : "Of course I won't!" }
    { "_id" : ObjectId("5f77b64eee00df1cc80135a3"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 03", "body" : "Where are you?" }
    { "_id" : ObjectId("5f77b64eee00df1cc80135a4"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 04", "body" : "I forgot 🤷‍♂️" }
    { "_id" : ObjectId("5f77b77cee00df1cc80135a6"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 05", "body" : "Srsly?" }
    bye
    

    내 데이터가 XML 형식이 아니면 어떻게 합니까? 페이로드에서 다른 필드를 원하면 어떻게 합니까?



    위의 예에서 우리는 소스 시스템(IBM MQ)에서 데이터를 가져오고 Kafka Connect는 그 안에 있는 text라는 필드에 스키마를 적용합니다(XML 변환은 제공된 XSD를 기반으로 이 작업을 수행함). Kafka에 작성되면 Avro가 Schema Registry에 스키마를 저장하는 선택된 변환기를 사용하여 직렬화됩니다. 모든 소비자가 사용할 수 있도록 스키마를 유지하므로 이것이 좋은 작업 방법입니다. 원하는 경우 여기에서도 Protobuf 또는 JSON 스키마를 사용할 수 있습니다. 이 모든 것이 이해가 되지 않는다면 다음을 확인하십시오.

    그러나 IBM MQ에서 오는 전체 페이로드는 다음과 같습니다.

    messageID=ID:414d5120514d3120202020202020202060e67a5f06352924
    messageType=text
    timestamp=1601893142430
    deliveryMode=1
    redelivered=false
    expiration=0
    priority=0
    properties={JMS_IBM_Format=Struct{propertyType=string,string=MQSTR   }, 
                JMS_IBM_PutDate=Struct{propertyType=string,string=20201005}, 
                JMS_IBM_Character_Set=Struct{propertyType=string,string=ISO-8859-1}, 
                JMSXDeliveryCount=Struct{propertyType=integer,integer=1}, 
                JMS_IBM_MsgType=Struct{propertyType=integer,integer=8}, 
                JMSXUserID=Struct{propertyType=string,string=mqm         }, 
                JMS_IBM_Encoding=Struct{propertyType=integer,integer=546}, 
                JMS_IBM_PutTime=Struct{propertyType=string,string=10190243}, 
                JMSXAppID=Struct{propertyType=string,string=amqsput                     }, 
                JMS_IBM_PutApplType=Struct{propertyType=integer,integer=6}}
    text=<note> <to>Jani</to> <from>Tove</from> <heading>Reminder 02</heading> <body>Of course I won't!</body> </note>
    

    이러한 필드의 일부 또는 전부를 유지하려면 다른 방식으로 접근해야 합니다. 현재 상태로는 비 XML 필드와 XML 필드를 모두 가져 와서 단일 구조 스키마로 랭글링할 수 있는 단일 메시지 변환이 없다는 것을 알고 있습니다. Kafka 메시지 헤더에 XML 필드). 기본적으로 IBM MQ Source Connector은 전체 페이로드를 schema에 기록합니다. 즉, 여전히 스키마 지원 직렬화 방법을 사용하지만 text 페이로드 필드는 구문 분석되지 않은 상태로 남아 있습니다.

    예를 들면 다음과 같습니다.

    curl -i -X PUT -H  "Content-Type:application/json" \
        http://localhost:8083/connectors/source-ibmmq-note-03/config \
        -d '{
        "connector.class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector",
        "kafka.topic":"ibmmq-note-03",
        "mq.hostname":"ibmmq",
        "mq.port":"1414",
        "mq.queue.manager":"QM1",
        "mq.transport.type":"client",
        "mq.channel":"DEV.APP.SVRCONN",
        "mq.username":"app",
        "mq.password":"password123",
        "jms.destination.name":"DEV.QUEUE.1",
        "jms.destination.type":"queue",
        "confluent.license":"",
        "confluent.topic.bootstrap.servers":"broker:29092",
        "confluent.topic.replication.factor":"1",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url":"http://schema-registry:8081"
        }'
    

    이제 전체 IBM MQ 메시지가 스키마로 직렬화된 Kafka 주제에 기록됩니다. kafkacat과 같은 것으로 역직렬화할 수 있습니다.

    kafkacat                                \
          -b broker:29092                   \
          -r http://schema-registry:8081    \
          -s key=s -s value=avro            \
          -t ibmmq-note-03                  \
          -C -c1 -o beginning -u -q -J | \
        jq  '.'
    



    {
      "topic": "ibmmq-note-03",
      "partition": 0,
      "offset": 0,
      "tstype": "create",
      "ts": 1601894073400,
      "broker": 1,
      "key": "Struct{messageID=ID:414d5120514d3120202020202020202060e67a5f033a2924}",
      "payload": {
        "messageID": "ID:414d5120514d3120202020202020202060e67a5f033a2924",
        "messageType": "text",
        "timestamp": 1601894073400,
        "deliveryMode": 1,
        "properties": {
          "JMS_IBM_Format": {
            "propertyType": "string",
            "boolean": null,
            "byte": null,
            "short": null,
            "integer": null,
            "long": null,
            "float": null,
            "double": null,
            "string": {
              "string": "MQSTR   "
            }
          },
        
        "map": null,
        "text": {
          "string": "<note> <to>Tove</to> <from>Jani</from> <heading>Reminder 01</heading> <body>Don't forget me this weekend!</body> </note>"
        }
      }
    }
    
    text 필드는 [무슨 일이] XML을 담고 있는 문자열이라는 것을 관찰하십시오.

    ksqlDB를 사용하여 어느 정도 데이터 작업을 할 수 있습니다. 현재는 없지만support for handing the XML:

    SELECT "PROPERTIES"['JMSXAppID']->STRING as JMSXAppID,
            "PROPERTIES"['JMS_IBM_PutTime']->STRING as JMS_IBM_PutTime,
            "PROPERTIES"['JMSXDeliveryCount']->INTEGER as JMSXDeliveryCount,
            "PROPERTIES"['JMSXUserID']->STRING as JMSXUserID,
            text
      FROM IBMMQ_SOURCE
      EMIT CHANGES;
    



    +-----------+-----------------+-------------------+------------+------------------------------------+
    |JMSXAPPID  |JMS_IBM_PUTTIME  |JMSXDELIVERYCOUNT  |JMSXUSERID  |TEXT                                |
    +-----------+-----------------+-------------------+------------+------------------------------------+
    |amqsput    |10302905         |1                  |mqm         |<note> <to>Jani</to> <from>Tove</fro|
    |           |                 |                   |            |m> <heading>Reminder 02</heading> <b|
    |           |                 |                   |            |ody>Of course I won't!</body> </note|
    |           |                 |                   |            |>                                   |
    |amqsput    |10302905         |1                  |mqm         |<note> <to>Tove</to> <from>Jani</fro|
    |           |                 |                   |            |m> <heading>Reminder 03</heading> <b|
    |           |                 |                   |            |ody>Where are you?</body> </note>   |
    

    👾 사용해 보세요!



    GitHub에서 Docker Compose를 사용하여 직접 실행하는 코드를 찾을 수 있습니다.

    좋은 웹페이지 즐겨찾기