IBM MQ에서 Kafka, MongoDB로 XML 메시지 스트리밍
Note
This same pattern for ingesting XML will work with other connectors such as JMS and ActiveMQ.
다음을 포함하는 Docker Compose 스택이 실행 중입니다.
일부 테스트 데이터를 IBM MQ에 로드
XML 파일에서 큐로 일부 메시지를 로드해 보겠습니다.
docker exec --interactive ibmmq \
/opt/mqm/samp/bin/amqsput DEV.QUEUE.1 QM1 < data/note.xml
IBM MQ에서 Kafka로 스트리밍 및 XML 메시지 변환
이제 Kafka Connect 플러그인 및 IbmMQSourceConnector과 함께 XML Transformation을 사용하여 이를 Kafka로 수집할 수 있습니다.
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-ibmmq-note-01/config \
-d '{
"connector.class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector",
"kafka.topic":"ibmmq-note-01",
"mq.hostname":"ibmmq",
"mq.port":"1414",
"mq.queue.manager":"QM1",
"mq.transport.type":"client",
"mq.channel":"DEV.APP.SVRCONN",
"mq.username":"app",
"mq.password":"password123",
"jms.destination.name":"DEV.QUEUE.1",
"jms.destination.type":"queue",
"confluent.license":"",
"confluent.topic.bootstrap.servers":"broker:29092",
"confluent.topic.replication.factor":"1",
"transforms": "extractPayload,xml",
"transforms.extractPayload.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractPayload.field": "text",
"transforms.xml.type": "com.github.jcustenborder.kafka.connect.transform.xml.FromXml$Value",
"transforms.xml.schema.path": "file:///data/note.xsd",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081"
}'
Note
ExtractField
is needed otherwise the XML transform will fail with java.lang.UnsupportedOperationException: STRUCT is not a supported type.
since it will be trying to operate on the entire payload from IBM MQ which includes fields other than the XML that we’re interested in.
결과 Kafka 주제는 Avro에서 직렬화된 메시지의 text
필드 값을 보유합니다.
docker exec kafkacat \
kafkacat \
-b broker:29092 \
-r http://schema-registry:8081 \
-s key=s -s value=avro \
-t ibmmq-note-01 \
-C -o beginning -u -q -J | \
jq -c '.payload'
{"Note":{"to":"Tove","from":"Jani","heading":"Reminder 01","body":"Don't forget me this weekend!"}}
{"Note":{"to":"Jani","from":"Tove","heading":"Reminder 02","body":"Of course I won't!"}}
…
XML 데이터를 Kafka로 가져오는 것과 관련된 개념에 대한 자세한 내용을 이해하기 위해 Kafka Connect 및 XML 변환의 세부 사항에 대해 작성했습니다.
Kafka에서 MongoDB로 데이터 스트리밍
그런 다음 official plugin for Kafka Connect from MongoDB 을 사용하여 다른 Kafka Connect 커넥터를 파이프라인에 추가할 수 있습니다. 그러면 Kafka 항목에서 MongoDB로 데이터가 바로 스트리밍됩니다.
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/sink-mongodb-note-01/config \
-d '{
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics":"ibmmq-note-01",
"connection.uri":"mongodb://mongodb:27017",
"database":"rmoff",
"collection":"notes",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081"
}'
MongoDB에서 데이터를 확인하십시오.
docker exec --interactive mongodb mongo localhost:27017 <<EOF
use rmoff
db.notes.find()
EOF
MongoDB shell version v4.4.1
connecting to: mongodb://localhost:27017/test?compressors=disabled&gssapiServiceName=mongodb
Implicit session: session { "id" : UUID("9aae83c4-0e25-43a9-aca5-7278d366423b") }
MongoDB server version: 4.4.1
switched to db rmoff
{ "_id" : ObjectId("5f77b64eee00df1cc80135a1"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 01", "body" : "Don't forget me this weekend!" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a2"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 02", "body" : "Of course I won't!" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a3"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 03", "body" : "Where are you?" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a4"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 04", "body" : "I forgot ð¤·ââï¸" }
bye
다른 레코드를 MQ로 전송하여 이것이 실제로 스트리밍되는지 확인합니다.
echo "<note> <to>Tove</to> <from>Jani</from> <heading>Reminder 05</heading> <body>Srsly?</body> </note>" | docker exec --interactive ibmmq /opt/mqm/samp/bin/amqsput DEV.QUEUE.1 QM1
Sample AMQSPUT0 start
target queue is DEV.QUEUE.1
Sample AMQSPUT0 end
그리고 MongoDB의 새로운 레코드를 보십시오.
docker exec --interactive mongodb mongo localhost:27017 <<EOF
use rmoff
db.notes.find()
EOF
MongoDB shell version v4.4.1
connecting to: mongodb://localhost:27017/test?compressors=disabled&gssapiServiceName=mongodb
Implicit session: session { "id" : UUID("2641e93e-9c5d-4270-8f64-e52295a60309") }
MongoDB server version: 4.4.1
switched to db rmoff
{ "_id" : ObjectId("5f77b64eee00df1cc80135a1"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 01", "body" : "Don't forget me this weekend!" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a2"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 02", "body" : "Of course I won't!" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a3"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 03", "body" : "Where are you?" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a4"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 04", "body" : "I forgot ð¤·ââï¸" }
{ "_id" : ObjectId("5f77b77cee00df1cc80135a6"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 05", "body" : "Srsly?" }
bye
내 데이터가 XML 형식이 아니면 어떻게 합니까? 페이로드에서 다른 필드를 원하면 어떻게 합니까?
위의 예에서 우리는 소스 시스템(IBM MQ)에서 데이터를 가져오고 Kafka Connect는 그 안에 있는 text
라는 필드에 스키마를 적용합니다(XML 변환은 제공된 XSD를 기반으로 이 작업을 수행함). Kafka에 작성되면 Avro가 Schema Registry에 스키마를 저장하는 선택된 변환기를 사용하여 직렬화됩니다. 모든 소비자가 사용할 수 있도록 스키마를 유지하므로 이것이 좋은 작업 방법입니다. 원하는 경우 여기에서도 Protobuf 또는 JSON 스키마를 사용할 수 있습니다. 이 모든 것이 이해가 되지 않는다면 다음을 확인하십시오.
그러나 IBM MQ에서 오는 전체 페이로드는 다음과 같습니다.
messageID=ID:414d5120514d3120202020202020202060e67a5f06352924
messageType=text
timestamp=1601893142430
deliveryMode=1
redelivered=false
expiration=0
priority=0
properties={JMS_IBM_Format=Struct{propertyType=string,string=MQSTR },
JMS_IBM_PutDate=Struct{propertyType=string,string=20201005},
JMS_IBM_Character_Set=Struct{propertyType=string,string=ISO-8859-1},
JMSXDeliveryCount=Struct{propertyType=integer,integer=1},
JMS_IBM_MsgType=Struct{propertyType=integer,integer=8},
JMSXUserID=Struct{propertyType=string,string=mqm },
JMS_IBM_Encoding=Struct{propertyType=integer,integer=546},
JMS_IBM_PutTime=Struct{propertyType=string,string=10190243},
JMSXAppID=Struct{propertyType=string,string=amqsput },
JMS_IBM_PutApplType=Struct{propertyType=integer,integer=6}}
text=<note> <to>Jani</to> <from>Tove</from> <heading>Reminder 02</heading> <body>Of course I won't!</body> </note>
이러한 필드의 일부 또는 전부를 유지하려면 다른 방식으로 접근해야 합니다. 현재 상태로는 비 XML 필드와 XML 필드를 모두 가져 와서 단일 구조 스키마로 랭글링할 수 있는 단일 메시지 변환이 없다는 것을 알고 있습니다. Kafka 메시지 헤더에 XML 필드). 기본적으로 IBM MQ Source Connector은 전체 페이로드를 schema에 기록합니다. 즉, 여전히 스키마 지원 직렬화 방법을 사용하지만 text
페이로드 필드는 구문 분석되지 않은 상태로 남아 있습니다.
예를 들면 다음과 같습니다.
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-ibmmq-note-03/config \
-d '{
"connector.class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector",
"kafka.topic":"ibmmq-note-03",
"mq.hostname":"ibmmq",
"mq.port":"1414",
"mq.queue.manager":"QM1",
"mq.transport.type":"client",
"mq.channel":"DEV.APP.SVRCONN",
"mq.username":"app",
"mq.password":"password123",
"jms.destination.name":"DEV.QUEUE.1",
"jms.destination.type":"queue",
"confluent.license":"",
"confluent.topic.bootstrap.servers":"broker:29092",
"confluent.topic.replication.factor":"1",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081"
}'
이제 전체 IBM MQ 메시지가 스키마로 직렬화된 Kafka 주제에 기록됩니다. kafkacat과 같은 것으로 역직렬화할 수 있습니다.
kafkacat \
-b broker:29092 \
-r http://schema-registry:8081 \
-s key=s -s value=avro \
-t ibmmq-note-03 \
-C -c1 -o beginning -u -q -J | \
jq '.'
{
"topic": "ibmmq-note-03",
"partition": 0,
"offset": 0,
"tstype": "create",
"ts": 1601894073400,
"broker": 1,
"key": "Struct{messageID=ID:414d5120514d3120202020202020202060e67a5f033a2924}",
"payload": {
"messageID": "ID:414d5120514d3120202020202020202060e67a5f033a2924",
"messageType": "text",
"timestamp": 1601894073400,
"deliveryMode": 1,
"properties": {
"JMS_IBM_Format": {
"propertyType": "string",
"boolean": null,
"byte": null,
"short": null,
"integer": null,
"long": null,
"float": null,
"double": null,
"string": {
"string": "MQSTR "
}
},
…
"map": null,
"text": {
"string": "<note> <to>Tove</to> <from>Jani</from> <heading>Reminder 01</heading> <body>Don't forget me this weekend!</body> </note>"
}
}
}
text
필드는 [무슨 일이] XML을 담고 있는 문자열이라는 것을 관찰하십시오.
ksqlDB를 사용하여 어느 정도 데이터 작업을 할 수 있습니다. 현재는 없지만support for handing the XML:
SELECT "PROPERTIES"['JMSXAppID']->STRING as JMSXAppID,
"PROPERTIES"['JMS_IBM_PutTime']->STRING as JMS_IBM_PutTime,
"PROPERTIES"['JMSXDeliveryCount']->INTEGER as JMSXDeliveryCount,
"PROPERTIES"['JMSXUserID']->STRING as JMSXUserID,
text
FROM IBMMQ_SOURCE
EMIT CHANGES;
+-----------+-----------------+-------------------+------------+------------------------------------+
|JMSXAPPID |JMS_IBM_PUTTIME |JMSXDELIVERYCOUNT |JMSXUSERID |TEXT |
+-----------+-----------------+-------------------+------------+------------------------------------+
|amqsput |10302905 |1 |mqm |<note> <to>Jani</to> <from>Tove</fro|
| | | | |m> <heading>Reminder 02</heading> <b|
| | | | |ody>Of course I won't!</body> </note|
| | | | |> |
|amqsput |10302905 |1 |mqm |<note> <to>Tove</to> <from>Jani</fro|
| | | | |m> <heading>Reminder 03</heading> <b|
| | | | |ody>Where are you?</body> </note> |
👾 사용해 보세요!
GitHub에서 Docker Compose를 사용하여 직접 실행하는 코드를 찾을 수 있습니다.
Reference
이 문제에 관하여(IBM MQ에서 Kafka, MongoDB로 XML 메시지 스트리밍), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://dev.to/rmoff/streaming-xml-messages-from-ibm-mq-into-kafka-into-mongodb-23m8
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
docker exec --interactive ibmmq \
/opt/mqm/samp/bin/amqsput DEV.QUEUE.1 QM1 < data/note.xml
이제 Kafka Connect 플러그인 및 IbmMQSourceConnector과 함께 XML Transformation을 사용하여 이를 Kafka로 수집할 수 있습니다.
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-ibmmq-note-01/config \
-d '{
"connector.class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector",
"kafka.topic":"ibmmq-note-01",
"mq.hostname":"ibmmq",
"mq.port":"1414",
"mq.queue.manager":"QM1",
"mq.transport.type":"client",
"mq.channel":"DEV.APP.SVRCONN",
"mq.username":"app",
"mq.password":"password123",
"jms.destination.name":"DEV.QUEUE.1",
"jms.destination.type":"queue",
"confluent.license":"",
"confluent.topic.bootstrap.servers":"broker:29092",
"confluent.topic.replication.factor":"1",
"transforms": "extractPayload,xml",
"transforms.extractPayload.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractPayload.field": "text",
"transforms.xml.type": "com.github.jcustenborder.kafka.connect.transform.xml.FromXml$Value",
"transforms.xml.schema.path": "file:///data/note.xsd",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081"
}'
Note
ExtractField
is needed otherwise the XML transform will fail withjava.lang.UnsupportedOperationException: STRUCT is not a supported type.
since it will be trying to operate on the entire payload from IBM MQ which includes fields other than the XML that we’re interested in.
결과 Kafka 주제는 Avro에서 직렬화된 메시지의
text
필드 값을 보유합니다.docker exec kafkacat \
kafkacat \
-b broker:29092 \
-r http://schema-registry:8081 \
-s key=s -s value=avro \
-t ibmmq-note-01 \
-C -o beginning -u -q -J | \
jq -c '.payload'
{"Note":{"to":"Tove","from":"Jani","heading":"Reminder 01","body":"Don't forget me this weekend!"}}
{"Note":{"to":"Jani","from":"Tove","heading":"Reminder 02","body":"Of course I won't!"}}
…
XML 데이터를 Kafka로 가져오는 것과 관련된 개념에 대한 자세한 내용을 이해하기 위해 Kafka Connect 및 XML 변환의 세부 사항에 대해 작성했습니다.
Kafka에서 MongoDB로 데이터 스트리밍
그런 다음 official plugin for Kafka Connect from MongoDB 을 사용하여 다른 Kafka Connect 커넥터를 파이프라인에 추가할 수 있습니다. 그러면 Kafka 항목에서 MongoDB로 데이터가 바로 스트리밍됩니다.
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/sink-mongodb-note-01/config \
-d '{
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics":"ibmmq-note-01",
"connection.uri":"mongodb://mongodb:27017",
"database":"rmoff",
"collection":"notes",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081"
}'
MongoDB에서 데이터를 확인하십시오.
docker exec --interactive mongodb mongo localhost:27017 <<EOF
use rmoff
db.notes.find()
EOF
MongoDB shell version v4.4.1
connecting to: mongodb://localhost:27017/test?compressors=disabled&gssapiServiceName=mongodb
Implicit session: session { "id" : UUID("9aae83c4-0e25-43a9-aca5-7278d366423b") }
MongoDB server version: 4.4.1
switched to db rmoff
{ "_id" : ObjectId("5f77b64eee00df1cc80135a1"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 01", "body" : "Don't forget me this weekend!" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a2"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 02", "body" : "Of course I won't!" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a3"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 03", "body" : "Where are you?" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a4"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 04", "body" : "I forgot ð¤·ââï¸" }
bye
다른 레코드를 MQ로 전송하여 이것이 실제로 스트리밍되는지 확인합니다.
echo "<note> <to>Tove</to> <from>Jani</from> <heading>Reminder 05</heading> <body>Srsly?</body> </note>" | docker exec --interactive ibmmq /opt/mqm/samp/bin/amqsput DEV.QUEUE.1 QM1
Sample AMQSPUT0 start
target queue is DEV.QUEUE.1
Sample AMQSPUT0 end
그리고 MongoDB의 새로운 레코드를 보십시오.
docker exec --interactive mongodb mongo localhost:27017 <<EOF
use rmoff
db.notes.find()
EOF
MongoDB shell version v4.4.1
connecting to: mongodb://localhost:27017/test?compressors=disabled&gssapiServiceName=mongodb
Implicit session: session { "id" : UUID("2641e93e-9c5d-4270-8f64-e52295a60309") }
MongoDB server version: 4.4.1
switched to db rmoff
{ "_id" : ObjectId("5f77b64eee00df1cc80135a1"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 01", "body" : "Don't forget me this weekend!" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a2"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 02", "body" : "Of course I won't!" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a3"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 03", "body" : "Where are you?" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a4"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 04", "body" : "I forgot ð¤·ââï¸" }
{ "_id" : ObjectId("5f77b77cee00df1cc80135a6"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 05", "body" : "Srsly?" }
bye
내 데이터가 XML 형식이 아니면 어떻게 합니까? 페이로드에서 다른 필드를 원하면 어떻게 합니까?
위의 예에서 우리는 소스 시스템(IBM MQ)에서 데이터를 가져오고 Kafka Connect는 그 안에 있는 text
라는 필드에 스키마를 적용합니다(XML 변환은 제공된 XSD를 기반으로 이 작업을 수행함). Kafka에 작성되면 Avro가 Schema Registry에 스키마를 저장하는 선택된 변환기를 사용하여 직렬화됩니다. 모든 소비자가 사용할 수 있도록 스키마를 유지하므로 이것이 좋은 작업 방법입니다. 원하는 경우 여기에서도 Protobuf 또는 JSON 스키마를 사용할 수 있습니다. 이 모든 것이 이해가 되지 않는다면 다음을 확인하십시오.
그러나 IBM MQ에서 오는 전체 페이로드는 다음과 같습니다.
messageID=ID:414d5120514d3120202020202020202060e67a5f06352924
messageType=text
timestamp=1601893142430
deliveryMode=1
redelivered=false
expiration=0
priority=0
properties={JMS_IBM_Format=Struct{propertyType=string,string=MQSTR },
JMS_IBM_PutDate=Struct{propertyType=string,string=20201005},
JMS_IBM_Character_Set=Struct{propertyType=string,string=ISO-8859-1},
JMSXDeliveryCount=Struct{propertyType=integer,integer=1},
JMS_IBM_MsgType=Struct{propertyType=integer,integer=8},
JMSXUserID=Struct{propertyType=string,string=mqm },
JMS_IBM_Encoding=Struct{propertyType=integer,integer=546},
JMS_IBM_PutTime=Struct{propertyType=string,string=10190243},
JMSXAppID=Struct{propertyType=string,string=amqsput },
JMS_IBM_PutApplType=Struct{propertyType=integer,integer=6}}
text=<note> <to>Jani</to> <from>Tove</from> <heading>Reminder 02</heading> <body>Of course I won't!</body> </note>
이러한 필드의 일부 또는 전부를 유지하려면 다른 방식으로 접근해야 합니다. 현재 상태로는 비 XML 필드와 XML 필드를 모두 가져 와서 단일 구조 스키마로 랭글링할 수 있는 단일 메시지 변환이 없다는 것을 알고 있습니다. Kafka 메시지 헤더에 XML 필드). 기본적으로 IBM MQ Source Connector은 전체 페이로드를 schema에 기록합니다. 즉, 여전히 스키마 지원 직렬화 방법을 사용하지만 text
페이로드 필드는 구문 분석되지 않은 상태로 남아 있습니다.
예를 들면 다음과 같습니다.
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-ibmmq-note-03/config \
-d '{
"connector.class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector",
"kafka.topic":"ibmmq-note-03",
"mq.hostname":"ibmmq",
"mq.port":"1414",
"mq.queue.manager":"QM1",
"mq.transport.type":"client",
"mq.channel":"DEV.APP.SVRCONN",
"mq.username":"app",
"mq.password":"password123",
"jms.destination.name":"DEV.QUEUE.1",
"jms.destination.type":"queue",
"confluent.license":"",
"confluent.topic.bootstrap.servers":"broker:29092",
"confluent.topic.replication.factor":"1",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081"
}'
이제 전체 IBM MQ 메시지가 스키마로 직렬화된 Kafka 주제에 기록됩니다. kafkacat과 같은 것으로 역직렬화할 수 있습니다.
kafkacat \
-b broker:29092 \
-r http://schema-registry:8081 \
-s key=s -s value=avro \
-t ibmmq-note-03 \
-C -c1 -o beginning -u -q -J | \
jq '.'
{
"topic": "ibmmq-note-03",
"partition": 0,
"offset": 0,
"tstype": "create",
"ts": 1601894073400,
"broker": 1,
"key": "Struct{messageID=ID:414d5120514d3120202020202020202060e67a5f033a2924}",
"payload": {
"messageID": "ID:414d5120514d3120202020202020202060e67a5f033a2924",
"messageType": "text",
"timestamp": 1601894073400,
"deliveryMode": 1,
"properties": {
"JMS_IBM_Format": {
"propertyType": "string",
"boolean": null,
"byte": null,
"short": null,
"integer": null,
"long": null,
"float": null,
"double": null,
"string": {
"string": "MQSTR "
}
},
…
"map": null,
"text": {
"string": "<note> <to>Tove</to> <from>Jani</from> <heading>Reminder 01</heading> <body>Don't forget me this weekend!</body> </note>"
}
}
}
text
필드는 [무슨 일이] XML을 담고 있는 문자열이라는 것을 관찰하십시오.
ksqlDB를 사용하여 어느 정도 데이터 작업을 할 수 있습니다. 현재는 없지만support for handing the XML:
SELECT "PROPERTIES"['JMSXAppID']->STRING as JMSXAppID,
"PROPERTIES"['JMS_IBM_PutTime']->STRING as JMS_IBM_PutTime,
"PROPERTIES"['JMSXDeliveryCount']->INTEGER as JMSXDeliveryCount,
"PROPERTIES"['JMSXUserID']->STRING as JMSXUserID,
text
FROM IBMMQ_SOURCE
EMIT CHANGES;
+-----------+-----------------+-------------------+------------+------------------------------------+
|JMSXAPPID |JMS_IBM_PUTTIME |JMSXDELIVERYCOUNT |JMSXUSERID |TEXT |
+-----------+-----------------+-------------------+------------+------------------------------------+
|amqsput |10302905 |1 |mqm |<note> <to>Jani</to> <from>Tove</fro|
| | | | |m> <heading>Reminder 02</heading> <b|
| | | | |ody>Of course I won't!</body> </note|
| | | | |> |
|amqsput |10302905 |1 |mqm |<note> <to>Tove</to> <from>Jani</fro|
| | | | |m> <heading>Reminder 03</heading> <b|
| | | | |ody>Where are you?</body> </note> |
👾 사용해 보세요!
GitHub에서 Docker Compose를 사용하여 직접 실행하는 코드를 찾을 수 있습니다.
Reference
이 문제에 관하여(IBM MQ에서 Kafka, MongoDB로 XML 메시지 스트리밍), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://dev.to/rmoff/streaming-xml-messages-from-ibm-mq-into-kafka-into-mongodb-23m8
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/sink-mongodb-note-01/config \
-d '{
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics":"ibmmq-note-01",
"connection.uri":"mongodb://mongodb:27017",
"database":"rmoff",
"collection":"notes",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081"
}'
docker exec --interactive mongodb mongo localhost:27017 <<EOF
use rmoff
db.notes.find()
EOF
MongoDB shell version v4.4.1
connecting to: mongodb://localhost:27017/test?compressors=disabled&gssapiServiceName=mongodb
Implicit session: session { "id" : UUID("9aae83c4-0e25-43a9-aca5-7278d366423b") }
MongoDB server version: 4.4.1
switched to db rmoff
{ "_id" : ObjectId("5f77b64eee00df1cc80135a1"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 01", "body" : "Don't forget me this weekend!" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a2"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 02", "body" : "Of course I won't!" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a3"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 03", "body" : "Where are you?" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a4"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 04", "body" : "I forgot ð¤·ââï¸" }
bye
echo "<note> <to>Tove</to> <from>Jani</from> <heading>Reminder 05</heading> <body>Srsly?</body> </note>" | docker exec --interactive ibmmq /opt/mqm/samp/bin/amqsput DEV.QUEUE.1 QM1
Sample AMQSPUT0 start
target queue is DEV.QUEUE.1
Sample AMQSPUT0 end
docker exec --interactive mongodb mongo localhost:27017 <<EOF
use rmoff
db.notes.find()
EOF
MongoDB shell version v4.4.1
connecting to: mongodb://localhost:27017/test?compressors=disabled&gssapiServiceName=mongodb
Implicit session: session { "id" : UUID("2641e93e-9c5d-4270-8f64-e52295a60309") }
MongoDB server version: 4.4.1
switched to db rmoff
{ "_id" : ObjectId("5f77b64eee00df1cc80135a1"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 01", "body" : "Don't forget me this weekend!" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a2"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 02", "body" : "Of course I won't!" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a3"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 03", "body" : "Where are you?" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a4"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 04", "body" : "I forgot ð¤·ââï¸" }
{ "_id" : ObjectId("5f77b77cee00df1cc80135a6"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 05", "body" : "Srsly?" }
bye
위의 예에서 우리는 소스 시스템(IBM MQ)에서 데이터를 가져오고 Kafka Connect는 그 안에 있는
text
라는 필드에 스키마를 적용합니다(XML 변환은 제공된 XSD를 기반으로 이 작업을 수행함). Kafka에 작성되면 Avro가 Schema Registry에 스키마를 저장하는 선택된 변환기를 사용하여 직렬화됩니다. 모든 소비자가 사용할 수 있도록 스키마를 유지하므로 이것이 좋은 작업 방법입니다. 원하는 경우 여기에서도 Protobuf 또는 JSON 스키마를 사용할 수 있습니다. 이 모든 것이 이해가 되지 않는다면 다음을 확인하십시오.그러나 IBM MQ에서 오는 전체 페이로드는 다음과 같습니다.
messageID=ID:414d5120514d3120202020202020202060e67a5f06352924
messageType=text
timestamp=1601893142430
deliveryMode=1
redelivered=false
expiration=0
priority=0
properties={JMS_IBM_Format=Struct{propertyType=string,string=MQSTR },
JMS_IBM_PutDate=Struct{propertyType=string,string=20201005},
JMS_IBM_Character_Set=Struct{propertyType=string,string=ISO-8859-1},
JMSXDeliveryCount=Struct{propertyType=integer,integer=1},
JMS_IBM_MsgType=Struct{propertyType=integer,integer=8},
JMSXUserID=Struct{propertyType=string,string=mqm },
JMS_IBM_Encoding=Struct{propertyType=integer,integer=546},
JMS_IBM_PutTime=Struct{propertyType=string,string=10190243},
JMSXAppID=Struct{propertyType=string,string=amqsput },
JMS_IBM_PutApplType=Struct{propertyType=integer,integer=6}}
text=<note> <to>Jani</to> <from>Tove</from> <heading>Reminder 02</heading> <body>Of course I won't!</body> </note>
이러한 필드의 일부 또는 전부를 유지하려면 다른 방식으로 접근해야 합니다. 현재 상태로는 비 XML 필드와 XML 필드를 모두 가져 와서 단일 구조 스키마로 랭글링할 수 있는 단일 메시지 변환이 없다는 것을 알고 있습니다. Kafka 메시지 헤더에 XML 필드). 기본적으로 IBM MQ Source Connector은 전체 페이로드를 schema에 기록합니다. 즉, 여전히 스키마 지원 직렬화 방법을 사용하지만
text
페이로드 필드는 구문 분석되지 않은 상태로 남아 있습니다.예를 들면 다음과 같습니다.
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-ibmmq-note-03/config \
-d '{
"connector.class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector",
"kafka.topic":"ibmmq-note-03",
"mq.hostname":"ibmmq",
"mq.port":"1414",
"mq.queue.manager":"QM1",
"mq.transport.type":"client",
"mq.channel":"DEV.APP.SVRCONN",
"mq.username":"app",
"mq.password":"password123",
"jms.destination.name":"DEV.QUEUE.1",
"jms.destination.type":"queue",
"confluent.license":"",
"confluent.topic.bootstrap.servers":"broker:29092",
"confluent.topic.replication.factor":"1",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081"
}'
이제 전체 IBM MQ 메시지가 스키마로 직렬화된 Kafka 주제에 기록됩니다. kafkacat과 같은 것으로 역직렬화할 수 있습니다.
kafkacat \
-b broker:29092 \
-r http://schema-registry:8081 \
-s key=s -s value=avro \
-t ibmmq-note-03 \
-C -c1 -o beginning -u -q -J | \
jq '.'
{
"topic": "ibmmq-note-03",
"partition": 0,
"offset": 0,
"tstype": "create",
"ts": 1601894073400,
"broker": 1,
"key": "Struct{messageID=ID:414d5120514d3120202020202020202060e67a5f033a2924}",
"payload": {
"messageID": "ID:414d5120514d3120202020202020202060e67a5f033a2924",
"messageType": "text",
"timestamp": 1601894073400,
"deliveryMode": 1,
"properties": {
"JMS_IBM_Format": {
"propertyType": "string",
"boolean": null,
"byte": null,
"short": null,
"integer": null,
"long": null,
"float": null,
"double": null,
"string": {
"string": "MQSTR "
}
},
…
"map": null,
"text": {
"string": "<note> <to>Tove</to> <from>Jani</from> <heading>Reminder 01</heading> <body>Don't forget me this weekend!</body> </note>"
}
}
}
text
필드는 [무슨 일이] XML을 담고 있는 문자열이라는 것을 관찰하십시오.ksqlDB를 사용하여 어느 정도 데이터 작업을 할 수 있습니다. 현재는 없지만support for handing the XML:
SELECT "PROPERTIES"['JMSXAppID']->STRING as JMSXAppID,
"PROPERTIES"['JMS_IBM_PutTime']->STRING as JMS_IBM_PutTime,
"PROPERTIES"['JMSXDeliveryCount']->INTEGER as JMSXDeliveryCount,
"PROPERTIES"['JMSXUserID']->STRING as JMSXUserID,
text
FROM IBMMQ_SOURCE
EMIT CHANGES;
+-----------+-----------------+-------------------+------------+------------------------------------+
|JMSXAPPID |JMS_IBM_PUTTIME |JMSXDELIVERYCOUNT |JMSXUSERID |TEXT |
+-----------+-----------------+-------------------+------------+------------------------------------+
|amqsput |10302905 |1 |mqm |<note> <to>Jani</to> <from>Tove</fro|
| | | | |m> <heading>Reminder 02</heading> <b|
| | | | |ody>Of course I won't!</body> </note|
| | | | |> |
|amqsput |10302905 |1 |mqm |<note> <to>Tove</to> <from>Jani</fro|
| | | | |m> <heading>Reminder 03</heading> <b|
| | | | |ody>Where are you?</body> </note> |
👾 사용해 보세요!
GitHub에서 Docker Compose를 사용하여 직접 실행하는 코드를 찾을 수 있습니다.
Reference
이 문제에 관하여(IBM MQ에서 Kafka, MongoDB로 XML 메시지 스트리밍), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다
https://dev.to/rmoff/streaming-xml-messages-from-ibm-mq-into-kafka-into-mongodb-23m8
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념
(Collection and Share based on the CC Protocol.)
Reference
이 문제에 관하여(IBM MQ에서 Kafka, MongoDB로 XML 메시지 스트리밍), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/rmoff/streaming-xml-messages-from-ibm-mq-into-kafka-into-mongodb-23m8텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)