자바 가 CSV 데 이 터 를 kafka 에 보 낸 예제

14411 단어 JavaCSVkafka
왜 CSV 데 이 터 를 kafka 에 보 냈 습 니까?
  • flink 가 스 트림 계산 을 할 때 kafka 메 시 지 를 데이터 소스 로 사용 하 는 것 이 자주 사용 되 는 수단 이기 때문에 flink 를 학습 하고 개발 하 는 과정 에서 데이터 세트 파일 의 기록 을 kafka 에 보 내 끊 임 없 는 데 이 터 를 모 의 한다.
  • 전체 절 차 는 다음 과 같다.
  • 당신 은 이렇게 하 는 것 이 쓸데없는 짓 이 라 고 생각 할 수 있 습 니 다.flink 는 CSV 를 직접 읽 으 면 되 지 않 습 니까?이렇게 하 는 이 유 는 다음 과 같다.
  • 우선,이것 은 학습 과 개발 시의 방법 으로 데이터 세트 는 CSV 파일 이 고 생산 환경의 실시 간 데 이 터 는 kafka 데이터 소스 이다.
  • 그 다음으로 자바 응용 에 특수 한 논 리 를 추가 할 수 있다.예 를 들 어 데이터 처리,종합 통계(flink 결과 와 비교 검증).
  • 또한 두 기록 의 실제 간격 이 1 분 이면 자바 응용 은 메 시 지 를 보 낼 때 1 분 간격 으로 다시 보 낼 수 있다.이 논 리 는 flink 커 뮤 니 티 의 demo 에서 구체 적 으로 실현 된다.이 demo 도 데이터 세트 를 kafka 에 보 내 고 flink 에서 kafka 를 소비 할 수 있다.주 소 는 https://github.com/ververica/sql-training
  • 이다.
    어떻게 CSV 데 이 터 를 kafka 에 보 냅 니까?
    앞의 그림 에서 보 듯 이 CSV 를 읽 고 kafka 에 메 시 지 를 보 내 는 작업 은 자바 응용 프로그램의 소행 이기 때문에 오늘 의 주요 작업 은 바로 이 자바 응용 프로그램 을 개발 하고 검증 하 는 것 이다.
    버 전 정보
  • JDK:1.8.0_181
  • 개발 도구:IntelliJ IDEA 2019.2.1(Ultimate Edition)
  • 개발 환경:Win 10
  • Zookeeper:3.4.13
  • Kafka:2.4.0(scala:2.12)
  • 데이터 세트 에 대하 여
  • 이번 실전 에 사 용 된 데이터 세트 는 CSV 파일 로 그 안에 104 만 개의 타 오 바 오 사용자 행위 데이터 가 있 습 니 다.이 데이터 출처 는 아 리 운 천지 공개 데이터 세트 입 니 다.저 는 이 데 이 터 를 소량 조정 했 습 니 다.
  • 이 CSV 파일 은 CSDN 에서 다운로드 할 수 있 습 니 다.주소:https://download.csdn.net/download/boling_cavalry/12381698
  • 도 제 Github 에서 다운로드 할 수 있 습 니 다.주소:https://raw.githubusercontent.com/zq2599/blog_demos/master/files/UserBehavior.7z
  • 이 CSV 파일 의 내용 은 모두 6 열 로 각 열의 의 미 는 다음 과 같다.
  • 열 이름
    설명 하 다.
    사용자 ID
    정수 형식,직렬 화 된 사용자 ID
    상품 ID
    정수 유형,직렬 화 된 상품 ID
    상품 종류 ID
    정수 유형,서열 화 된 상품 소속 클래스 ID
    행동 유형
    문자열,매 거 진 형식,포함('pv','buy','cart','fav')
    타임 스탬프
    행동 발생 시간 스탬프
    시간 문자열
    타임 스탬프 필드 에 따라 생 성 된 시간 문자열
  • 이 데이터 세트 에 대한 상세 한 정 보 는 플 랭크 학습 에 사용 할 데이터 세트 준비
  • 을 참고 하 시기 바 랍 니 다.
    Java 응용 프로그램 안내
    인 코딩 하기 전에 구체 적 인 내용 을 열거 한 다음 에 하나씩 실현 합 니 다.
  • CSV 에서 기록 을 읽 는 도구 클래스:UserBehaviorCsvFileReader
  • 각 기록 에 대응 하 는 Bean 류:UserBehavior
  • 자바 대상 이 JSON 의 직렬 화 클래스 로 직렬 화 됨:JSonSerializer
  • kafka 에 메 시 지 를 보 내 는 도구 클래스:Kafka Producer
  • 응용 프로그램 종류,프로그램 입구:SendMessage 응용 프로그램
  • 상기 다섯 가지 유형 은 자바 응용 작업 을 완성 할 수 있 습 니 다.다음 에 인 코딩 을 시작 하 겠 습 니 다.
    직접 다운로드 원본
    코드 를 쓰 지 않 으 려 면 GitHub 에서 이 프로젝트 의 원본 코드,주소 와 링크 정 보 를 아래 표 에서 직접 다운로드 할 수 있 습 니 다.
    명칭.
    링크
    비고
    프로젝트 홈 페이지
    https://github.com/zq2599/blog_demos
    이 항목 은 GitHub 홈 페이지 에 있 습 니 다.
    git 창고 주소(https)
    https://github.com/zq2599/blog_demos.git
    이 프로젝트 의 원본 창고 주소,https 프로 토 콜
    git 창고 주소(ssh)
    [email protected]:zq2599/blog_demos.git
    이 프로젝트 소스 의 창고 주소,ssh 프로 토 콜
    이 git 프로젝트 에는 여러 개의 폴 더 가 있 습 니 다.이 장의 원본 코드 는 flinksql 이 폴 더 아래 에 있 습 니 다.아래 그림 의 빨 간 상자 와 같 습 니 다.

    부호화
    maven 프로젝트 를 만 듭 니 다.pom.xml 는 다음 과 같 습 니 다.중요 한 jackson 과 javacsv 의 의존 도 는 다음 과 같 습 니 다.
    
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <modelVersion>4.0.0</modelVersion>
    
     <groupId>com.bolingcavalry</groupId>
     <artifactId>flinksql</artifactId>
     <version>1.0-SNAPSHOT</version>
    
     <properties>
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      <flink.version>1.10.0</flink.version>
      <kafka.version>2.2.0</kafka.version>
      <java.version>1.8</java.version>
      <scala.binary.version>2.11</scala.binary.version>
      <maven.compiler.source>${java.version}</maven.compiler.source>
      <maven.compiler.target>${java.version}</maven.compiler.target>
     </properties>
    
     <dependencies>
      <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
       <version>${kafka.version}</version>
      </dependency>
    
      <dependency>
       <groupId>com.fasterxml.jackson.core</groupId>
       <artifactId>jackson-databind</artifactId>
       <version>2.9.10.1</version>
      </dependency>
    
      <!-- Logging dependencies -->
      <dependency>
       <groupId>org.slf4j</groupId>
       <artifactId>slf4j-log4j12</artifactId>
       <version>1.7.7</version>
       <scope>runtime</scope>
      </dependency>
      <dependency>
       <groupId>log4j</groupId>
       <artifactId>log4j</artifactId>
       <version>1.2.17</version>
       <scope>runtime</scope>
      </dependency>
      <dependency>
       <groupId>net.sourceforge.javacsv</groupId>
       <artifactId>javacsv</artifactId>
       <version>2.0</version>
      </dependency>
    
     </dependencies>
    
     <build>
      <plugins>
       <!-- Java Compiler -->
       <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.1</version>
        <configuration>
         <source>${java.version}</source>
         <target>${java.version}</target>
        </configuration>
       </plugin>
    
       <!-- Shade plugin to include all dependencies -->
       <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.0.0</version>
        <executions>
         <!-- Run shade goal on package phase -->
         <execution>
          <phase>package</phase>
          <goals>
           <goal>shade</goal>
          </goals>
          <configuration>
           <artifactSet>
            <excludes>
            </excludes>
           </artifactSet>
           <filters>
            <filter>
             <!-- Do not copy the signatures in the META-INF folder.
             Otherwise, this might cause SecurityExceptions when using the JAR. -->
             <artifact>*:*</artifact>
             <excludes>
              <exclude>META-INF/*.SF</exclude>
              <exclude>META-INF/*.DSA</exclude>
              <exclude>META-INF/*.RSA</exclude>
             </excludes>
            </filter>
           </filters>
          </configuration>
         </execution>
        </executions>
       </plugin>
      </plugins>
     </build>
    </project>
    CSV 에서 기록 을 읽 는 도구 클래스:UserBehaviorCsvFileReader,다음 주 프로그램 에 서 는 자바 8 의 Steam API 로 집합 을 처리 하기 때문에 UserBehaviorCsvFileReader 는 Supplier 인 터 페 이 스 를 실현 합 니 다.
    
    public class UserBehaviorCsvFileReader implements Supplier<UserBehavior> {
    
     private final String filePath;
     private CsvReader csvReader;
    
     public UserBehaviorCsvFileReader(String filePath) throws IOException {
    
      this.filePath = filePath;
      try {
       csvReader = new CsvReader(filePath);
       csvReader.readHeaders();
      } catch (IOException e) {
       throw new IOException("Error reading TaxiRecords from file: " + filePath, e);
      }
     }
    
     @Override
     public UserBehavior get() {
      UserBehavior userBehavior = null;
      try{
       if(csvReader.readRecord()) {
        csvReader.getRawRecord();
        userBehavior = new UserBehavior(
          Long.valueOf(csvReader.get(0)),
          Long.valueOf(csvReader.get(1)),
          Long.valueOf(csvReader.get(2)),
          csvReader.get(3),
          new Date(Long.valueOf(csvReader.get(4))*1000L));
       }
      } catch (IOException e) {
       throw new NoSuchElementException("IOException from " + filePath);
      }
    
      if (null==userBehavior) {
       throw new NoSuchElementException("All records read from " + filePath);
      }
    
      return userBehavior;
     }
    }
    각 기록 에 대응 하 는 Bean 클래스:UserBehavior 는 CSV 기록 형식 과 일치 하면 됩 니 다.시간 을 표시 하 는 ts 필드 는 JSonFormat 주 해 를 사용 하여 직렬 화 할 때 형식 을 제어 합 니 다.
    
    public class UserBehavior {
    
     @JsonFormat
     private long user_id;
    
     @JsonFormat
     private long item_id;
    
     @JsonFormat
     private long category_id;
    
     @JsonFormat
     private String behavior;
    
     @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss'Z'")
     private Date ts;
    
     public UserBehavior() {
     }
    
     public UserBehavior(long user_id, long item_id, long category_id, String behavior, Date ts) {
      this.user_id = user_id;
      this.item_id = item_id;
      this.category_id = category_id;
      this.behavior = behavior;
      this.ts = ts;
     }
    }
    자바 대상 이 JSON 의 직렬 화 클래스 로 정렬 됨:JSonSerializer
    
    public class JsonSerializer<T> {
    
     private final ObjectMapper jsonMapper = new ObjectMapper();
    
     public String toJSONString(T r) {
      try {
       return jsonMapper.writeValueAsString(r);
      } catch (JsonProcessingException e) {
       throw new IllegalArgumentException("Could not serialize record: " + r, e);
      }
     }
    
     public byte[] toJSONBytes(T r) {
      try {
       return jsonMapper.writeValueAsBytes(r);
      } catch (JsonProcessingException e) {
       throw new IllegalArgumentException("Could not serialize record: " + r, e);
      }
     }
    }
    kafka 에 메 시 지 를 보 내 는 도구 클래스:Kafka Producer:
    
    public class KafkaProducer implements Consumer<UserBehavior> {
    
     private final String topic;
     private final org.apache.kafka.clients.producer.KafkaProducer<byte[], byte[]> producer;
     private final JsonSerializer<UserBehavior> serializer;
    
     public KafkaProducer(String kafkaTopic, String kafkaBrokers) {
      this.topic = kafkaTopic;
      this.producer = new org.apache.kafka.clients.producer.KafkaProducer<>(createKafkaProperties(kafkaBrokers));
      this.serializer = new JsonSerializer<>();
     }
    
     @Override
     public void accept(UserBehavior record) {
      //        byte  
      byte[] data = serializer.toJSONBytes(record);
      //   
      ProducerRecord<byte[], byte[]> kafkaRecord = new ProducerRecord<>(topic, data);
      //   
      producer.send(kafkaRecord);
    
      //   sleep       ,     kafka    flink        
      try {
       Thread.sleep(500);
      }catch(InterruptedException e){
       e.printStackTrace();
      }
     }
    
     /**
      * kafka  
      * @param brokers The brokers to connect to.
      * @return A Kafka producer configuration.
      */
     private static Properties createKafkaProperties(String brokers) {
      Properties kafkaProps = new Properties();
      kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
      kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
      kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
      return kafkaProps;
     }
    }
    마지막 으로 응용 클래스 SendMessageApplication 입 니 다.CSV 파일 경로,kafka 의 topic 와 borker 주 소 는 모두 여기에 설정 되 어 있 습 니 다.또한 자바 8 의 Stream API 를 통 해 소량의 코드 만 있 으 면 모든 작업 을 완성 할 수 있 습 니 다.
    
    public class SendMessageApplication {
    
     public static void main(String[] args) throws Exception {
      //     
      String filePath = "D:\\temp\\202005\\02\\UserBehavior.csv";
      // kafka topic
      String topic = "user_behavior";
      // kafka borker  
      String broker = "192.168.50.43:9092";
    
      Stream.generate(new UserBehaviorCsvFileReader(filePath))
        .sequential()
        .forEachOrdered(new KafkaProducer(topic, broker));
     }
    }
    검증 하 다.
  • kafka 가 준비 되 었 는 지 확인 하고 userbehavior 의 topic 가 만 들 어 졌 습 니 다.
  • CSV 파일 을 준비 하 십시오.
  • SendMessageApplication.java 의 파일 주소,kafka topic,kafka broker 세 개의 매개 변수 가 정확 하고 틀림 이 없 음 을 확인 합 니 다.
  • SendMessageApplication.java 실행;
  • 콘 솔 메시지 kafka 메 시 지 를 엽 니 다.참조 명령 은 다음 과 같 습 니 다.
  • 
    ./kafka-console-consumer.sh \
    --bootstrap-server 127.0.0.1:9092 \
    --topic user_behavior \
    --consumer-property group.id=old-consumer-test \
    --consumer-property consumer.id=old-consumer-cl \
    --from-beginning
  • 정상 적 인 상황 에서 바로 소식 을 볼 수 있다.아래 그림:

  • 이로써 자바 응용 을 통 해 사용자 행동 메시지 흐름 을 모 의 하 는 작업 이 완료 되 었 고 다음 flink 실전 은 이 를 데이터 소스 로 사용 합 니 다.
    이상 은 자바 가 CSV 의 데 이 터 를 kafka 에 보 내 는 예제 의 상세 한 내용 입 니 다.자바 CSV 에 관 한 데 이 터 를 kafka 에 보 내 는 자 료 는 다른 관련 글 을 주목 하 세 요!

    좋은 웹페이지 즐겨찾기