Kafka로 XML 데이터 수집 - 옵션 3: Kafka Connect FilePulse 커넥터
curl
를 통해 파이프된 xq
와 같은 소스를 사용하여 XML에 대한 수집 파이프라인을 Kafka로 해킹하여 XML을 랭글링하고 kafkacat
를 사용하여 Kafka로 스트리밍하는 방법을 보았습니다. 선택적으로 ksqlDB를 사용하여 적용하고 등록합니다. 그것을 위한 스키마.Kafka Connect 소스 커넥터와
kafka-connect-transform-xml
단일 메시지 변환의 사용을 보여주었습니다. 이제 XML 데이터를 Kafka로 수집하는 데 사용할 수 있는 커뮤니티의 소스 커넥터를 살펴보겠습니다.FilePulse은 에서 작성한 Apache 2.0 라이센스 커넥터입니다. XML을 포함하여 다양한 형식의 플랫 파일에서 수집을 지원합니다. Florian은 이에 대해 유용한 블로그를 작성했습니다.
Kafka Connect 및 FilePulse 커넥터를 사용하여 XML 데이터를 Kafka로 수집
simple XML source file을 사용하여 먼저 이것을 기반으로 복사했습니다.
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-filepulse-xml-00/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/data/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.xml$",
"offset.strategy":"name",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
"topic":"books-00",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"_connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"tasks.max": 1
}'
이것은 Kafka Connect가 루트 요소( x:books
)를 Avro로 직렬화하려고 시도한 지점에서 실패했습니다.
Caused by: org.apache.avro.SchemaParseException: Illegal character in: X:books
at org.apache.avro.Schema.validateName(Schema.java:1530)
at org.apache.avro.Schema.access$400(Schema.java:87)
at org.apache.avro.Schema$Name.<init>(Schema.java:673)
at org.apache.avro.Schema.createRecord(Schema.java:212)
XML은 다음과 같습니다.
<?xml version="1.0"?>
<x:books xmlns:x="urn:books">
<book id="bk001">
<author>Writer</author>
<title>The First Book</title>
…
어쨌든 루트 요소를 원하지 않기 때문에 xpath.expression
구성 요소로 XPath를 사용하여 원하는 비트를 지정할 수 있습니다.
XPath를 알아내는 유용한 방법은 실행xmllint --shell <your xml file>
하고 구조를 탐색하여 알아내는 것입니다. 오래된 기술의 좋은 점은 Google에 과거에 같은 문제를 겪었던 사람들의 엄청난 리소스가 있다는 것입니다. this from 2010 이 글을 작성하는 데 도움이 되었습니다! 내 XPath 표현식은 간단히 /*/book
다음과 같습니다.
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-filepulse-xml-01/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/data/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.xml$",
"offset.strategy":"name",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
"xpath.expression": "/*/book",
"topic":"books-01",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"_connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"tasks.max": 1
}'
이것은 효과가 있었고 주제에 대해 소비자를 사용하는 것을 확인할 수 있습니다. 여기서는 더 빠르기 때문에 ksqlDB를 사용하고 있습니다.
ksql> PRINT 'books-01' FROM BEGINNING;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: AVRO or KAFKA_STRING
rowtime: 2020/10/02 11:26:45.222 Z, key: <null>, value: {"id": "bk001", "author": "Writer", "title": "The First Book", "genre": "Fiction", "price": "44.95", "pub_date": "2000-10-01", "review": "An amazing story of nothing."}
rowtime: 2020/10/02 11:26:45.226 Z, key: <null>, value: {"id": "bk002", "author": "Poet", "title": "The Poet's First Poem", "genre": "Poem", "price": "24.95", "pub_date": "2000-10-01", "review": "Least poetic poems."}
값은 XML 자체에서 유추된 스키마를 사용하여 Avro로 직렬화되었습니다. Schema Registry에서 조회하여 확인할 수 있습니다.
docker exec --tty schema-registry \
curl -s "http://localhost:8081/subjects/books-01-value/versions/1" | \
jq '.schema|fromjson[1]'
{
"type": "record",
"name": "ConnectDefault",
"namespace": "io.confluent.connect.avro",
"fields": [
{ "name": "id", "type": [ "null", "string" ], "default": null },
{ "name": "author", "type": [ "null", "string" ], "default": null },
{ "name": "title", "type": [ "null", "string" ], "default": null },
…
Avro는 Kafka Connect 작업자 구성에서 기본 변환기로 설정됩니다. 예를 들어 필요한 value.converter
구성을 설정하여 Protobuf를 사용하려는 경우 이를 재정의할 수 있습니다.
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-filepulse-xml-02/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/data/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.xml$",
"offset.strategy":"name",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
"xpath.expression": "/*/book",
"topic":"books-02",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"_connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"tasks.max": 1,
"value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081"
}'
이번에는 데이터가 Protobuf로 작성되었으며 ksqlDB에서도 확인할 수 있습니다(메시지를 읽을 때 직렬화 방법을 가장 잘 추측하고 적절한 디시리얼라이저를 자동으로 선택함).
ksql> PRINT 'books-02' FROM BEGINNING;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: PROTOBUF or KAFKA_STRING
rowtime: 2020/10/02 11:31:34.066 Z, key: <null>, value: id: "bk001" author: "Writer" title: "The First Book" genre: "Fiction" price: "44.95" pub_date: "2000-10-01" review: "An amazing story of nothing."
rowtime: 2020/10/02 11:31:34.068 Z, key: <null>, value: id: "bk002" author: "Poet" title: "The Poet\'s First Poem" genre: "Poem" price: "24.95" pub_date: "2000-10-01" review: "Least poetic poems."
약간의 ksqlDB
플랫 파일에서 Kafka 주제로 스트리밍되는 데이터를 사용하여 다음을 수행할 수 있습니다.
ksql> CREATE STREAM BOOKS WITH (KAFKA_TOPIC='books-02',VALUE_FORMAT='PROTOBUF');
Message
----------------
Stream created
----------------
ksql>
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT * FROM BOOKS EMIT CHANGES LIMIT 2;
+--------+---------+-----------------------+---------+--------+------------+----------------------------+
|ID |AUTHOR |TITLE |GENRE |PRICE |PUB_DATE |REVIEW |
+--------+---------+-----------------------+---------+--------+------------+----------------------------+
|bk001 |Writer |The First Book |Fiction |44.95 |2000-10-01 |An amazing story of nothing |
|bk002 |Poet |The Poet's First Poem |Poem |24.95 |2000-10-01 |Least poetic poems. |
Limit Reached
Query terminated
FilePulse를 사용한 XML 수집의 더 많은 순열은 확인하십시오.
XML을 Kafka로 가져오기 위한 다른 옵션은 무엇입니까?
FilePulse는 여기에서 훌륭하게 작동했으며 분명히 처리 및 파일 처리 옵션lot of flexibility이 있습니다. XSD 없이도 XML에서 페이로드의 스키마를 유추할 수 있다는 점도 매우 편리합니다.
그러나 데이터가 플랫 파일에 없으면 어떻게 될까요? 불행히도 이 상황에서는 다른 옵션을 찾아야 합니다.
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-filepulse-xml-00/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/data/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.xml$",
"offset.strategy":"name",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
"topic":"books-00",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"_connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"tasks.max": 1
}'
Caused by: org.apache.avro.SchemaParseException: Illegal character in: X:books
at org.apache.avro.Schema.validateName(Schema.java:1530)
at org.apache.avro.Schema.access$400(Schema.java:87)
at org.apache.avro.Schema$Name.<init>(Schema.java:673)
at org.apache.avro.Schema.createRecord(Schema.java:212)
<?xml version="1.0"?>
<x:books xmlns:x="urn:books">
<book id="bk001">
<author>Writer</author>
<title>The First Book</title>
…
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-filepulse-xml-01/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/data/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.xml$",
"offset.strategy":"name",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
"xpath.expression": "/*/book",
"topic":"books-01",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"_connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"tasks.max": 1
}'
ksql> PRINT 'books-01' FROM BEGINNING;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: AVRO or KAFKA_STRING
rowtime: 2020/10/02 11:26:45.222 Z, key: <null>, value: {"id": "bk001", "author": "Writer", "title": "The First Book", "genre": "Fiction", "price": "44.95", "pub_date": "2000-10-01", "review": "An amazing story of nothing."}
rowtime: 2020/10/02 11:26:45.226 Z, key: <null>, value: {"id": "bk002", "author": "Poet", "title": "The Poet's First Poem", "genre": "Poem", "price": "24.95", "pub_date": "2000-10-01", "review": "Least poetic poems."}
docker exec --tty schema-registry \
curl -s "http://localhost:8081/subjects/books-01-value/versions/1" | \
jq '.schema|fromjson[1]'
{
"type": "record",
"name": "ConnectDefault",
"namespace": "io.confluent.connect.avro",
"fields": [
{ "name": "id", "type": [ "null", "string" ], "default": null },
{ "name": "author", "type": [ "null", "string" ], "default": null },
{ "name": "title", "type": [ "null", "string" ], "default": null },
…
curl -i -X PUT -H "Accept:application/json" \
-H "Content-Type:application/json" http://localhost:8083/connectors/source-filepulse-xml-02/config \
-d '{
"connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
"fs.scan.directory.path":"/data/",
"fs.scan.interval.ms":"10000",
"fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
"file.filter.regex.pattern":".*\\.xml$",
"offset.strategy":"name",
"task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
"xpath.expression": "/*/book",
"topic":"books-02",
"internal.kafka.reporter.bootstrap.servers": "broker:29092",
"internal.kafka.reporter.topic":"_connect-file-pulse-status",
"fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
"tasks.max": 1,
"value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081"
}'
ksql> PRINT 'books-02' FROM BEGINNING;
Key format: ¯\_(ツ)_/¯ - no data processed
Value format: PROTOBUF or KAFKA_STRING
rowtime: 2020/10/02 11:31:34.066 Z, key: <null>, value: id: "bk001" author: "Writer" title: "The First Book" genre: "Fiction" price: "44.95" pub_date: "2000-10-01" review: "An amazing story of nothing."
rowtime: 2020/10/02 11:31:34.068 Z, key: <null>, value: id: "bk002" author: "Poet" title: "The Poet\'s First Poem" genre: "Poem" price: "24.95" pub_date: "2000-10-01" review: "Least poetic poems."
플랫 파일에서 Kafka 주제로 스트리밍되는 데이터를 사용하여 다음을 수행할 수 있습니다.
ksql> CREATE STREAM BOOKS WITH (KAFKA_TOPIC='books-02',VALUE_FORMAT='PROTOBUF');
Message
----------------
Stream created
----------------
ksql>
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT * FROM BOOKS EMIT CHANGES LIMIT 2;
+--------+---------+-----------------------+---------+--------+------------+----------------------------+
|ID |AUTHOR |TITLE |GENRE |PRICE |PUB_DATE |REVIEW |
+--------+---------+-----------------------+---------+--------+------------+----------------------------+
|bk001 |Writer |The First Book |Fiction |44.95 |2000-10-01 |An amazing story of nothing |
|bk002 |Poet |The Poet's First Poem |Poem |24.95 |2000-10-01 |Least poetic poems. |
Limit Reached
Query terminated
FilePulse를 사용한 XML 수집의 더 많은 순열은 확인하십시오.
XML을 Kafka로 가져오기 위한 다른 옵션은 무엇입니까?
FilePulse는 여기에서 훌륭하게 작동했으며 분명히 처리 및 파일 처리 옵션lot of flexibility이 있습니다. XSD 없이도 XML에서 페이로드의 스키마를 유추할 수 있다는 점도 매우 편리합니다.
그러나 데이터가 플랫 파일에 없으면 어떻게 될까요? 불행히도 이 상황에서는 다른 옵션을 찾아야 합니다.
👾 사용해 보세요!
GitHub 에서 Docker Compose를 사용하여 직접 실행하는 코드를 찾을 수 있습니다.
Reference
이 문제에 관하여(Kafka로 XML 데이터 수집 - 옵션 3: Kafka Connect FilePulse 커넥터), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/confluentinc/ingesting-xml-data-into-kafka-option-3-kafka-connect-filepulse-connector-fnb텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)