자바 가 CSV 데 이 터 를 kafka 에 보 낸 예제
어떻게 CSV 데 이 터 를 kafka 에 보 냅 니까?
앞의 그림 에서 보 듯 이 CSV 를 읽 고 kafka 에 메 시 지 를 보 내 는 작업 은 자바 응용 프로그램의 소행 이기 때문에 오늘 의 주요 작업 은 바로 이 자바 응용 프로그램 을 개발 하고 검증 하 는 것 이다.
버 전 정보
설명 하 다.
사용자 ID
정수 형식,직렬 화 된 사용자 ID
상품 ID
정수 유형,직렬 화 된 상품 ID
상품 종류 ID
정수 유형,서열 화 된 상품 소속 클래스 ID
행동 유형
문자열,매 거 진 형식,포함('pv','buy','cart','fav')
타임 스탬프
행동 발생 시간 스탬프
시간 문자열
타임 스탬프 필드 에 따라 생 성 된 시간 문자열
Java 응용 프로그램 안내
인 코딩 하기 전에 구체 적 인 내용 을 열거 한 다음 에 하나씩 실현 합 니 다.
직접 다운로드 원본
코드 를 쓰 지 않 으 려 면 GitHub 에서 이 프로젝트 의 원본 코드,주소 와 링크 정 보 를 아래 표 에서 직접 다운로드 할 수 있 습 니 다.
명칭.
링크
비고
프로젝트 홈 페이지
https://github.com/zq2599/blog_demos
이 항목 은 GitHub 홈 페이지 에 있 습 니 다.
git 창고 주소(https)
https://github.com/zq2599/blog_demos.git
이 프로젝트 의 원본 창고 주소,https 프로 토 콜
git 창고 주소(ssh)
[email protected]:zq2599/blog_demos.git
이 프로젝트 소스 의 창고 주소,ssh 프로 토 콜
이 git 프로젝트 에는 여러 개의 폴 더 가 있 습 니 다.이 장의 원본 코드 는 flinksql 이 폴 더 아래 에 있 습 니 다.아래 그림 의 빨 간 상자 와 같 습 니 다.
부호화
maven 프로젝트 를 만 듭 니 다.pom.xml 는 다음 과 같 습 니 다.중요 한 jackson 과 javacsv 의 의존 도 는 다음 과 같 습 니 다.
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.bolingcavalry</groupId>
<artifactId>flinksql</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.10.0</flink.version>
<kafka.version>2.2.0</kafka.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.10.1</version>
</dependency>
<!-- Logging dependencies -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>net.sourceforge.javacsv</groupId>
<artifactId>javacsv</artifactId>
<version>2.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
</configuration>
</plugin>
<!-- Shade plugin to include all dependencies -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.0.0</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
CSV 에서 기록 을 읽 는 도구 클래스:UserBehaviorCsvFileReader,다음 주 프로그램 에 서 는 자바 8 의 Steam API 로 집합 을 처리 하기 때문에 UserBehaviorCsvFileReader 는 Supplier 인 터 페 이 스 를 실현 합 니 다.
public class UserBehaviorCsvFileReader implements Supplier<UserBehavior> {
private final String filePath;
private CsvReader csvReader;
public UserBehaviorCsvFileReader(String filePath) throws IOException {
this.filePath = filePath;
try {
csvReader = new CsvReader(filePath);
csvReader.readHeaders();
} catch (IOException e) {
throw new IOException("Error reading TaxiRecords from file: " + filePath, e);
}
}
@Override
public UserBehavior get() {
UserBehavior userBehavior = null;
try{
if(csvReader.readRecord()) {
csvReader.getRawRecord();
userBehavior = new UserBehavior(
Long.valueOf(csvReader.get(0)),
Long.valueOf(csvReader.get(1)),
Long.valueOf(csvReader.get(2)),
csvReader.get(3),
new Date(Long.valueOf(csvReader.get(4))*1000L));
}
} catch (IOException e) {
throw new NoSuchElementException("IOException from " + filePath);
}
if (null==userBehavior) {
throw new NoSuchElementException("All records read from " + filePath);
}
return userBehavior;
}
}
각 기록 에 대응 하 는 Bean 클래스:UserBehavior 는 CSV 기록 형식 과 일치 하면 됩 니 다.시간 을 표시 하 는 ts 필드 는 JSonFormat 주 해 를 사용 하여 직렬 화 할 때 형식 을 제어 합 니 다.
public class UserBehavior {
@JsonFormat
private long user_id;
@JsonFormat
private long item_id;
@JsonFormat
private long category_id;
@JsonFormat
private String behavior;
@JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss'Z'")
private Date ts;
public UserBehavior() {
}
public UserBehavior(long user_id, long item_id, long category_id, String behavior, Date ts) {
this.user_id = user_id;
this.item_id = item_id;
this.category_id = category_id;
this.behavior = behavior;
this.ts = ts;
}
}
자바 대상 이 JSON 의 직렬 화 클래스 로 정렬 됨:JSonSerializer
public class JsonSerializer<T> {
private final ObjectMapper jsonMapper = new ObjectMapper();
public String toJSONString(T r) {
try {
return jsonMapper.writeValueAsString(r);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Could not serialize record: " + r, e);
}
}
public byte[] toJSONBytes(T r) {
try {
return jsonMapper.writeValueAsBytes(r);
} catch (JsonProcessingException e) {
throw new IllegalArgumentException("Could not serialize record: " + r, e);
}
}
}
kafka 에 메 시 지 를 보 내 는 도구 클래스:Kafka Producer:
public class KafkaProducer implements Consumer<UserBehavior> {
private final String topic;
private final org.apache.kafka.clients.producer.KafkaProducer<byte[], byte[]> producer;
private final JsonSerializer<UserBehavior> serializer;
public KafkaProducer(String kafkaTopic, String kafkaBrokers) {
this.topic = kafkaTopic;
this.producer = new org.apache.kafka.clients.producer.KafkaProducer<>(createKafkaProperties(kafkaBrokers));
this.serializer = new JsonSerializer<>();
}
@Override
public void accept(UserBehavior record) {
// byte
byte[] data = serializer.toJSONBytes(record);
//
ProducerRecord<byte[], byte[]> kafkaRecord = new ProducerRecord<>(topic, data);
//
producer.send(kafkaRecord);
// sleep , kafka flink
try {
Thread.sleep(500);
}catch(InterruptedException e){
e.printStackTrace();
}
}
/**
* kafka
* @param brokers The brokers to connect to.
* @return A Kafka producer configuration.
*/
private static Properties createKafkaProperties(String brokers) {
Properties kafkaProps = new Properties();
kafkaProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
kafkaProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
kafkaProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getCanonicalName());
return kafkaProps;
}
}
마지막 으로 응용 클래스 SendMessageApplication 입 니 다.CSV 파일 경로,kafka 의 topic 와 borker 주 소 는 모두 여기에 설정 되 어 있 습 니 다.또한 자바 8 의 Stream API 를 통 해 소량의 코드 만 있 으 면 모든 작업 을 완성 할 수 있 습 니 다.
public class SendMessageApplication {
public static void main(String[] args) throws Exception {
//
String filePath = "D:\\temp\\202005\\02\\UserBehavior.csv";
// kafka topic
String topic = "user_behavior";
// kafka borker
String broker = "192.168.50.43:9092";
Stream.generate(new UserBehaviorCsvFileReader(filePath))
.sequential()
.forEachOrdered(new KafkaProducer(topic, broker));
}
}
검증 하 다.
./kafka-console-consumer.sh \
--bootstrap-server 127.0.0.1:9092 \
--topic user_behavior \
--consumer-property group.id=old-consumer-test \
--consumer-property consumer.id=old-consumer-cl \
--from-beginning
이로써 자바 응용 을 통 해 사용자 행동 메시지 흐름 을 모 의 하 는 작업 이 완료 되 었 고 다음 flink 실전 은 이 를 데이터 소스 로 사용 합 니 다.
이상 은 자바 가 CSV 의 데 이 터 를 kafka 에 보 내 는 예제 의 상세 한 내용 입 니 다.자바 CSV 에 관 한 데 이 터 를 kafka 에 보 내 는 자 료 는 다른 관련 글 을 주목 하 세 요!
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
JPA + QueryDSL 계층형 댓글, 대댓글 구현(2)이번엔 전편에 이어서 계층형 댓글, 대댓글을 다시 리팩토링해볼 예정이다. 이전 게시글에서는 계층형 댓글, 대댓글을 구현은 되었지만 N+1 문제가 있었다. 이번에는 그 N+1 문제를 해결해 볼 것이다. 위의 로직은 이...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.