Kafka S01/E03으로 데이터 스트림 전송 - JSON 파일 로드

이것은 데이터 흐름을 카프카로 전송하는 시리즈의 세 번째 문장이다.앞의 두 가지 예에서 우리는 Kafka Connect를 사용하여 CSV와 XML 파일의 기록을 Apache Kafka에 불러오는 방법을 보았다. 코드를 한 줄 작성하지 않아도 된다.이를 위해 우리는 Kafka Connect FilePulse connector를 사용했는데 이것은 많은 우수한 특성을 포함하여 데이터를 해석하고 변환한다.


  • 이제 JSON 데이터를 어떻게 통합하는지 봅시다. 이것은 대부분의 프로젝트에서 광범위하게 사용되는 파일 형식입니다. (웹 기반 응용 프로그램은 XML보다 더 인기가 많습니다.)

    카프카 연결 파일 펄스 연결기


    이전 글을 읽었다면 다음 섹션(즉 데이터 섭취)으로 넘어가십시오.

    Kafka Connect FilePulse connector는 기능이 강한 원본 연결기로 로컬 파일 시스템에서 데이터를 쉽게 해석하고 변환하며 아파치 카프카에 데이터를 불러올 수 있다.CSV, XML, JSON, LOG4J, AVRO 등 다양한 파일 형식에 대한 내장 지원을 제공합니다.
    FilePulse에 대해 자세히 알아보려면 다음 내용을 참조하십시오.
  • Kafka Connect FilePulse - One Connector to Ingest them All!
  • 자세한 내용은 documentation here 를 참조하십시오.

    커넥터 사용 방법


    Kafka Connect FilePulse 커넥터를 사용하는 가장 간단하고 빠른 방법은 Docker Hub에서 제공하는 Docker 이미지를 사용하는 것입니다.
    $ docker pull streamthoughts/kafka-connect-file-pulse:1.6.3
    
    GitHub 프로젝트 저장소에서 제공하는 docker-compose.yml 파일을 다운로드하여 Kafka Connect와 FilePulse 커넥터가 미리 설치된 융합 플랫폼을 신속하게 시작할 수 있습니다.
    $ wget https://raw.githubusercontent.com/streamthoughts/kafka-connect-file-pulse/v1.6.3/docker-compose.yml
    $ docker-compose up -d
    
    모든 Docker 컨테이너를 시작하면 커넥터가 http://localhost:8083 에서 액세스할 수 있는 Kafka Connect Worker에 설치되어 있는지 확인할 수 있습니다.
    $ curl -s localhost:8083/connector-plugins|jq '.[].class'|egrep FilePulse
    
    "io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector"
    
    참고: 커넥터를 GitHub Releases Page 또는 Confluent Hub 에서 설치할 수도 있습니다.

    데이터를 섭취하다


    단일 JSON 문서가 포함된 파일을 읽으려면 BytesArrayInputReader 를 사용합니다.이 리더는 모든 원본 파일에 대한 기록을 만들 수 있도록 합니다.이 리더가 생성하는 각 레코드에는 message 바이트 [] 유형의 필드가 있습니다.byte[] 값은 소스 파일(즉 JSON 문서)의 전체 내용입니다.
    그리고 이 필드를 해석하기 위해 FilePulse 커넥터에서 제공하는 처리 필터링 메커니즘을 사용합니다. 특히 JSONFilter
    이 최소 구성으로 커넥터를 만듭니다.
    $ curl \
        -i -X PUT -H "Accept:application/json" \
        -H  "Content-Type:application/json" http://localhost:8083/connectors/tracks-json-filepulse-00/config \
        -d '{
            "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
            "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
            "fs.scan.interval.ms":"10000",
            "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
            "file.filter.regex.pattern":".*\\.json$",
            "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
            "offset.strategy":"name",
            "topic":"tracks-filepulse-json-00",
            "internal.kafka.reporter.bootstrap.servers": "broker:29092",
            "internal.kafka.reporter.topic":"connect-file-pulse-status",
            "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
            "filters": "ParseJSON",
            "filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
            "filters.ParseJSON.source":"message",
            "filters.ParseJSON.merge":"true",
            "tasks.max": 1
        }'
    
    참고: Connect FilePulse 커넥터는 등록 정보fs.scan.directory.path를 사용하여 설정한 입력 디렉토리를 정기적으로 검색합니다.그런 다음 패턴과 일치하는 파일 .*\\.json$ 을 찾습니다.offset.strategy 값에 따라 각 파일에 대해 고유한 식별 및 추적을 수행합니다.여기서 설정 지정한 파일은 그 이름으로 표시됩니다.
    다음과 같은 유효한 JSON 파일을 만듭니다.
    $ cat <<EOF > track.json
    { 
      "track": {
         "title":"Star Wars (Main Theme)",
         "artist":"John Williams, London Symphony Orchestra",
         "album":"Star Wars",
         "duration":"10:52"
      }
    }
    EOF
    
    그런 다음 이 파일을 호스트에서 커넥터를 실행하는 Docker 컨테이너로 복사합니다.다음 명령을 실행할 수 있습니다.
    // Create the target directory
    $ docker exec -it connect mkdir -p /tmp/kafka-connect/examples
    
    // Copy host file to docker-container
    $ docker cp track.json connect://tmp/kafka-connect/examples/track-00.json
    
    마지막으로 tracks-filepulse-json-00라는 주제를 사용하여 커넥터가 JSON 파일을 감지하고 처리했는지 확인합니다.
    $ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
            -b localhost:9092 \
            -t tracks-filepulse-json-00 \
            -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
    
    (출력)
    {
      "message": {
        "bytes": "{ \n  \"track\": {\n     \"title\":\"Star Wars (Main Theme)\",\n     \"artist\":\"John Williams, London Symphony Orchestra\",\n     \"album\":\"Star Wars\",\n     \"duration\":\"10:52\"\n  }\n}\n"
      },
      "track": {
        "Track": {
          "title": {
            "string": "Star Wars (Main Theme)"
          },
          "artist": {
            "string": "John Williams, London Symphony Orchestra"
          },
          "album": {
            "string": "Star Wars"
          },
          "duration": {
            "string": "10:52"
          }
        }
      }
    }
    
    주의: 위의 예시에서 우리는 kafkacat 을 사용하여 주제를 사용합니다.옵션-o-1 최신 메시지만 사용

    필드 제외

    JSONFilter 원래 JSON 문자열이 포함된 원래 필드는 자동으로 삭제되지 않습니다(즉, message.이 필드를 유지하지 않으려면 다음과 같이 ExcludeFilter 를 사용하여 삭제할 수 있습니다.
    $ curl \
        -i -X PUT -H "Accept:application/json" \
        -H  "Content-Type:application/json" http://localhost:8083/connectors/tracks-json-filepulse-00/config \
        -d '{
            "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
            "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
            "fs.scan.interval.ms":"10000",
            "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
            "file.filter.regex.pattern":".*\\.json$",
            "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
            "offset.strategy":"name",
            "topic":"tracks-filepulse-json-01",
            "internal.kafka.reporter.bootstrap.servers": "broker:29092",
            "internal.kafka.reporter.topic":"connect-file-pulse-status",
            "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
            "filters": "ParseJSON, ExcludeFieldMessage",
            "filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
            "filters.ParseJSON.source":"message",
            "filters.ParseJSON.merge":"true",
            "filters.ExcludeFieldMessage.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter",
            "filters.ExcludeFieldMessage.fields":"message",
            "tasks.max": 1
        }'
    
    앞에서 설명한 대로 JSON 파일을 Docker 컨테이너로 복사합니다.
    $ docker cp track.json \
    connect://tmp/kafka-connect/examples/track-01.json
    
    다음 명령을 실행하여 출력 테마tracks-filepulse-json-01를 사용합니다.
    $ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
            -b localhost:9092 \
            -t tracks-filepulse-json-01 \
            -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
    
    (출력)
    {
      "track": {
        "Track": {
          "title": {
            "string": "Star Wars (Main Theme)"
          },
          "artist": {
            "string": "John Williams, London Symphony Orchestra"
          },
          "album": {
            "string": "Star Wars"
          },
          "duration": {
            "string": "10:52"
          }
        }
      }
    }
    
    그렇습니다!우리는 입력 파일에 포함된 메시지와 유사한 구조가 뚜렷한 메시지를 성공적으로 생성했다.
    이제 우리 한 걸음 더 나아가자.

    빈 값 처리


    때때로 빈 값이 있는 JSON 문서를 처리해야 할 수도 있습니다.기본적으로, 만약 우리가 지금까지 사용한 설정을 사용한다면, 서열화 기간에null 값을 무시할 것입니다.
    커넥터가 null 값을 포함하는 필드의 유형을 추정할 수 없기 때문입니다.
    그러나, 우리는null값의 유형을 정의하고 기본값을 설정할 수 있습니다.
    참고: AppendFilter Connect FilePulse 커넥터에서 제공하는 기본 표현식 언어로 기록 필드에 액세스하고 조작할 수 있습니다.
    커넥터 구성을 업데이트합니다.
    $ curl \
        -i -X PUT -H "Accept:application/json" \
        -H  "Content-Type:application/json" http://localhost:8083/connectors/tracks-json-filepulse-02/config \
        -d '{
            "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
            "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
            "fs.scan.interval.ms":"10000",
            "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
            "file.filter.regex.pattern":".*\\.json$",
            "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
            "offset.strategy":"name",
            "topic":"tracks-filepulse-json-02",
            "internal.kafka.reporter.bootstrap.servers": "broker:29092",
            "internal.kafka.reporter.topic":"connect-file-pulse-status",
            "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
            "filters": "ParseJSON, ExcludeFieldMessage, SetDefaultRank",
            "filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
            "filters.ParseJSON.source":"message",
            "filters.ParseJSON.merge":"true",
            "filters.ExcludeFieldMessage.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter",
            "filters.ExcludeFieldMessage.fields":"message",
            "filters.SetDefaultRank.type":"io.streamthoughts.kafka.connect.filepulse.filter.AppendFilter",
            "filters.SetDefaultRank.field":"$value.track.rank",
            "filters.SetDefaultRank.value":"{{ converts(nlv($value.track.rank, 0), '\''INTEGER'\'') }}",
            "filters.SetDefaultRank.overwrite": "true",
            "tasks.max": 1
        }'
    
    다음 내용이 포함된 두 번째 JSON 문서를 만듭니다.
    $ cat <<EOF > track-with-null.json
    { 
      "track": {
         "title":"Duel of the Fates",
         "artist":"John Williams, London Symphony Orchestra",
         "album":"Star Wars",
         "duration":"4:14",
         "rank": null
      }
    }
    EOF
    
    앞에서 설명한 대로 Docker 컨테이너에 복사합니다.
    $ docker cp track-with-null.json \
    connect://tmp/kafka-connect/examples/track-02.json
    
    다음 출력 테마 사용tracks-filepulse-json-01:
    $ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
            -b localhost:9092 \
            -t tracks-filepulse-json-02 \
            -C -J -q -o-1 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
    
    (출력)
    {
      "track": {
        "Track": {
          "title": {
            "string": "Duel of the Fates"
          },
          "artist": {
            "string": "John Williams, London Symphony Orchestra"
          },
          "album": {
            "string": "Star Wars"
          },
          "duration": {
            "string": "4:14"
          },
          "rank": {
            "int": 0
          }
        }
      }
    }
    
    마지막으로, int 형식의 필드 rank 를 포함하고 기본값 0 으로 초기화하는 출력 메시지를 받아야 합니다.

    단순 연결 표현식 언어(SCEL) JSON 배열 분할


    마지막으로 JSON 레코드 그룹을 포함하는 JSON 파일을 처리하는 것도 흔하다.
    그룹에서 각 요소에 대한 기록을 생성하려면 explode.arrayJSONFilter 속성을 true 로 설정해야 합니다.
    커넥터 구성을 업데이트하려면 다음과 같이 하십시오.
    $ curl \
        -i -X PUT -H "Accept:application/json" \
        -H  "Content-Type:application/json" http://localhost:8083/connectors/tracks-json-filepulse-00/config \
        -d '{
            "connector.class":"io.streamthoughts.kafka.connect.filepulse.source.FilePulseSourceConnector",
            "fs.scan.directory.path":"/tmp/kafka-connect/examples/",
            "fs.scan.interval.ms":"10000",
            "fs.scan.filters":"io.streamthoughts.kafka.connect.filepulse.scanner.local.filter.RegexFileListFilter",
            "file.filter.regex.pattern":".*\\.json$",
            "task.reader.class": "io.streamthoughts.kafka.connect.filepulse.reader.BytesArrayInputReader",
            "offset.strategy":"name",
            "topic":"tracks-filepulse-json-03",
            "internal.kafka.reporter.bootstrap.servers": "broker:29092",
            "internal.kafka.reporter.topic":"connect-file-pulse-status",
            "fs.cleanup.policy.class": "io.streamthoughts.kafka.connect.filepulse.clean.DeleteCleanupPolicy",
            "filters": "ParseJSON, ExcludeFieldMessage",
            "filters.ParseJSON.type":"io.streamthoughts.kafka.connect.filepulse.filter.JSONFilter",
            "filters.ParseJSON.source":"message",
            "filters.ParseJSON.merge":"true",
            "filters.ParseJSON.explode.array":"true",
            "filters.ExcludeFieldMessage.type":"io.streamthoughts.kafka.connect.filepulse.filter.ExcludeFilter",
            "filters.ExcludeFieldMessage.fields":"message",
            "tasks.max": 1
        }'
    
    두 개의 JSON 객체를 포함하는 파일을 만듭니다.
    $ cat <<EOF > tracks.json
    [
      {
        "track": {
          "title": "Star Wars (Main Theme)",
          "artist":  "John Williams, London Symphony Orchestra",
          "album": "Star Wars",
          "duration": "10:52"
        }
      },
      {
        "track": {
          "title":  "Duel of the Fates",
          "artist": "John Williams, London Symphony Orchestra",
          "album": "Star Wars: The Phantom Menace (Original Motion Picture Soundtrack)",
          "duration": "4:14"
        }
      }
    ]
    EOF
    
    앞에서 설명한 대로 Docker 컨테이너에 복사합니다.
    $ docker cp tracks.json \
    connect://tmp/kafka-connect/examples/tracks-00.json
    
    그리고 출력 테마tracks-filepulse-json-02를 사용합니다.
    $ docker run --tty --network=host edenhill/kafkacat:1.6.0 kafkacat \
            -b localhost:9092 \
            -t tracks-filepulse-json-03 \
            -C -J -q -o0 -s key=s -s value=avro -r http://localhost:8081 | jq .payload.ConnectDefault
    
    (출력)
    {
      "track": {
        "Track": {
          "title": {
            "string": "Star Wars (Main Theme)"
          },
          "artist": {
            "string": "John Williams, London Symphony Orchestra"
          },
          "album": {
            "string": "Star Wars"
          },
          "duration": {
            "string": "10:52"
          }
        }
      }
    }
    {
      "track": {
        "Track": {
          "title": {
            "string": "Duel of the Fates"
          },
          "artist": {
            "string": "John Williams, London Symphony Orchestra"
          },
          "album": {
            "string": "Star Wars: The Phantom Menace (Original Motion Picture Soundtrack)"
          },
          "duration": {
            "string": "4:14"
          }
        }
      }
    }
    
    봐라!이제 Kafka Connect를 사용하여 JSON 파일을 처리하는 방법을 알게 되었습니다.

    단순 연결 표현식 언어(SCEL) 결론


    본고에서 보듯이 Kafka Connect를 사용하여 코드 한 줄을 작성하지 않아도 JSON 파일의 기록을 Apache Kafka에 쉽게 불러올 수 있습니다.Connect File Pulse connector는 Apache Kafka에 데이터를 로드하기 전에 데이터를 쉽게 조작할 수 있는 강력한 솔루션입니다.
    이 글을 공유해 주십시오. 만약 당신이 이 프로젝트를 좋아한다면.너는 심지어 하나를 추가할 수 있다⭐ GitHub 저장소에서 지원합니다.
    고맙습니다.

    좋은 웹페이지 즐겨찾기