Kafka로 XML 데이터 수집 - 옵션 3: Kafka Connect FilePulse 커넥터

👉
curl를 통해 파이프된 xq와 같은 소스를 사용하여 XML에 대한 수집 파이프라인을 Kafka로 해킹하여 XML을 랭글링하고 kafkacat를 사용하여 Kafka로 스트리밍하는 방법을 보았습니다. 선택적으로 ksqlDB를 사용하여 적용하고 등록합니다. 그것을 위한 스키마.

Kafka Connect 소스 커넥터와 kafka-connect-transform-xml 단일 메시지 변환의 사용을 보여주었습니다. 이제 XML 데이터를 Kafka로 수집하는 데 사용할 수 있는 커뮤니티의 소스 커넥터를 살펴보겠습니다.

FilePulse은 에서 작성한 Apache 2.0 라이센스 커넥터입니다. XML을 포함하여 다양한 형식의 플랫 파일에서 수집을 지원합니다. Florian은 이에 대해 유용한 블로그를 작성했습니다.

Kafka Connect 및 FilePulse 커넥터를 사용하여 XML 데이터를 Kafka로 수집



simple XML source file을 사용하여 먼저 이것을 기반으로 복사했습니다.

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-filepulse-xml-00/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/data/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.xml$",
        "offset.strategy":"name",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
        "topic":"books-00",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"_connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "tasks.max": 1
    }'


이것은 Kafka Connect가 루트 요소( x:books )를 Avro로 직렬화하려고 시도한 지점에서 실패했습니다.

Caused by: org.apache.avro.SchemaParseException: Illegal character in: X:books
        at org.apache.avro.Schema.validateName(Schema.java:1530)
        at org.apache.avro.Schema.access$400(Schema.java:87)
        at org.apache.avro.Schema$Name.<init>(Schema.java:673)
        at org.apache.avro.Schema.createRecord(Schema.java:212)

XML은 다음과 같습니다.

<?xml version="1.0"?>
<x:books xmlns:x="urn:books">
    <book id="bk001">
        <author>Writer</author>
        <title>The First Book</title>


어쨌든 루트 요소를 원하지 않기 때문에 xpath.expression 구성 요소로 XPath를 사용하여 원하는 비트를 지정할 수 있습니다.

XPath를 알아내는 유용한 방법은 실행xmllint --shell <your xml file>하고 구조를 탐색하여 알아내는 것입니다. 오래된 기술의 좋은 점은 Google에 과거에 같은 문제를 겪었던 사람들의 엄청난 리소스가 있다는 것입니다. this from 2010 이 글을 작성하는 데 도움이 되었습니다! 내 XPath 표현식은 간단히 /*/book 다음과 같습니다.

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-filepulse-xml-01/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/data/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.xml$",
        "offset.strategy":"name",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
        "xpath.expression": "/*/book",
        "topic":"books-01",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"_connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "tasks.max": 1
    }'


이것은 효과가 있었고 주제에 대해 소비자를 사용하는 것을 확인할 수 있습니다. 여기서는 더 빠르기 때문에 ksqlDB를 사용하고 있습니다.

ksql> PRINT 'books-01' FROM BEGINNING;
Key format: ¯\_()_/¯ - no data processed
Value format: AVRO or KAFKA_STRING
rowtime: 2020/10/02 11:26:45.222 Z, key: <null>, value: {"id": "bk001", "author": "Writer", "title": "The First Book", "genre": "Fiction", "price": "44.95", "pub_date": "2000-10-01", "review": "An amazing story of nothing."}
rowtime: 2020/10/02 11:26:45.226 Z, key: <null>, value: {"id": "bk002", "author": "Poet", "title": "The Poet's First Poem", "genre": "Poem", "price": "24.95", "pub_date": "2000-10-01", "review": "Least poetic poems."}


값은 XML 자체에서 유추된 스키마를 사용하여 Avro로 직렬화되었습니다. Schema Registry에서 조회하여 확인할 수 있습니다.

docker exec --tty schema-registry \
    curl -s "http://localhost:8081/subjects/books-01-value/versions/1" | \
    jq '.schema|fromjson[1]'



{
    "type": "record",
    "name": "ConnectDefault",
    "namespace": "io.confluent.connect.avro",
    "fields": [
    { "name": "id", "type": [ "null", "string" ], "default": null },
    { "name": "author", "type": [ "null", "string" ], "default": null },
    { "name": "title", "type": [ "null", "string" ], "default": null },
    


Avro는 Kafka Connect 작업자 구성에서 기본 변환기로 설정됩니다. 예를 들어 필요한 value.converter 구성을 설정하여 Protobuf를 사용하려는 경우 이를 재정의할 수 있습니다.

curl -i -X PUT -H "Accept:application/json" \
    -H  "Content-Type:application/json" http://localhost:8083/connectors/source-filepulse-xml-02/config \
    -d '{
        "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
        "fs.scan.directory.path":"/data/",
        "fs.scan.interval.ms":"10000",
        "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
        "file.filter.regex.pattern":".*\\.xml$",
        "offset.strategy":"name",
        "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.XMLFileInputReader",
        "xpath.expression": "/*/book",
        "topic":"books-02",
        "internal.kafka.reporter.bootstrap.servers": "broker:29092",
        "internal.kafka.reporter.topic":"_connect-file-pulse-status",
        "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.LogCleanupPolicy",
        "tasks.max": 1,
        "value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
        "value.converter.schema.registry.url":"http://schema-registry:8081"
    }'


이번에는 데이터가 Protobuf로 작성되었으며 ksqlDB에서도 확인할 수 있습니다(메시지를 읽을 때 직렬화 방법을 가장 잘 추측하고 적절한 디시리얼라이저를 자동으로 선택함).

ksql> PRINT 'books-02' FROM BEGINNING;
Key format: ¯\_()_/¯ - no data processed
Value format: PROTOBUF or KAFKA_STRING
rowtime: 2020/10/02 11:31:34.066 Z, key: <null>, value: id: "bk001" author: "Writer" title: "The First Book" genre: "Fiction" price: "44.95" pub_date: "2000-10-01" review: "An amazing story of nothing."
rowtime: 2020/10/02 11:31:34.068 Z, key: <null>, value: id: "bk002" author: "Poet" title: "The Poet\'s First Poem" genre: "Poem" price: "24.95" pub_date: "2000-10-01" review: "Least poetic poems."


약간의 ksqlDB



플랫 파일에서 Kafka 주제로 스트리밍되는 데이터를 사용하여 다음을 수행할 수 있습니다.

ksql> CREATE STREAM BOOKS WITH (KAFKA_TOPIC='books-02',VALUE_FORMAT='PROTOBUF');

 Message
----------------
 Stream created
----------------
ksql>
ksql> SET 'auto.offset.reset' = 'earliest';
Successfully changed local property 'auto.offset.reset' to 'earliest'. Use the UNSET command to revert your change.
ksql> SELECT * FROM BOOKS EMIT CHANGES LIMIT 2;
+--------+---------+-----------------------+---------+--------+------------+----------------------------+
|ID      |AUTHOR   |TITLE                  |GENRE    |PRICE   |PUB_DATE    |REVIEW                      |
+--------+---------+-----------------------+---------+--------+------------+----------------------------+
|bk001   |Writer   |The First Book         |Fiction  |44.95   |2000-10-01  |An amazing story of nothing |
|bk002   |Poet     |The Poet's First Poem  |Poem     |24.95   |2000-10-01  |Least poetic poems.         |
Limit Reached
Query terminated


FilePulse를 사용한 XML 수집의 더 많은 순열은 확인하십시오.

XML을 Kafka로 가져오기 위한 다른 옵션은 무엇입니까?



FilePulse는 여기에서 훌륭하게 작동했으며 분명히 처리 및 파일 처리 옵션lot of flexibility이 있습니다. XSD 없이도 XML에서 페이로드의 스키마를 유추할 수 있다는 점도 매우 편리합니다.

그러나 데이터가 플랫 파일에 없으면 어떻게 될까요? 불행히도 이 상황에서는 다른 옵션을 찾아야 합니다.
  • 옵션 1:
  • 옵션 2:

  • 👾 사용해 보세요!



    GitHub 에서 Docker Compose를 사용하여 직접 실행하는 코드를 찾을 수 있습니다.

    좋은 웹페이지 즐겨찾기