Spring Cloud를 사용한 기능적 Kafka - 1부
8082 단어 kafkaspringfunctionalcloud
전제 조건:
설정:
이 기사는 먼저 Spring Cloud Stream을 사용하여 기능적 kafka 소비자가 있는 문자열로 Kafka에 웹 API 게시 이벤트를 설정하는 것으로 시작합니다.
환경 설정
여기에서 Apache ZooKeeper를 다운로드하십시오.
https://zookeeper.apache.org/releases.html#download
압축을 풀고 작업 폴더로 이동합니다(
$ZOOKEEPER_HOME
사용)여기에서 Kafka를 다운로드하십시오.
https://kafka.apache.org/downloads
다시 아카이브 압축을 풀고 작업 폴더로 이동합니다. 이번에는 작업 폴더를
$KAFKA_HOME
로 참조하겠습니다.다음을 사용하여 스키마 레지스트리를 다운로드하고 압축을 풉니다.
$ wget https://packages.confluent.io/archive/7.0/confluent-community-7.0.1.tar.gz
$ tar -xf confluent-community-7.0.1.tar.gz
$ cd confluent-7.0.1
$CONFLUENT_HOME
에서 이 애플리케이션 위치를 참조합니다.환경 구성
먼저 ZooKeeper를 설정합시다!
주키퍼 구성
예제 구성을 복사하여 구성 파일을 만듭니다.
ZOOKEEPER_HOME $> cp conf/zoo_sample.cfg conf/zoo.cfg
편집할 수 있는 유일한 값은 메타데이터를 편안하게 저장할 수 있는 위치에 대한 dataDir 디렉터리입니다.
...
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.
dataDir=./data
파일을 저장하면 Kafka 구성으로 이동합니다.
카프카 구성
여기서 우리가 관심 있는 파일은 다음과 같습니다.
$KAFKA_HOME/config/server.properties
변경 가능성이 가장 높은 두 가지 값이 있습니다.
log.dirs=./logs
zookeeper.connect=localhost:2181
이렇게 하면 로그가 액세스 가능한 위치에 있고 Kafka가 ZooKeeper에 연결하여 데이터를 유지할 수 있습니다.
스키마 레지스트리 구성
여기에서 하나의 파일만 확인해야 합니다:
$CONFLUENT_HOME/etc/schema-registry/schema-registry.properties
모든 기본값은 괜찮을 것입니다.
listeners=http://0.0.0.0:8081
kafkastore.bootstrap.servers=PLAINTEXT://localhost:9092
kafkastore.topic=_schemas
debug=false
ZooKeeper 및 Kafka 시작
Kafka에서
String
값으로 작업하고 있으므로 먼저 Schema Registry를 사용하지 않을 것이므로 시작하겠습니다.$ZOOKEEPER_HOME > bin/zkServer.sh start conf/zoo.cfg
ZooKeeper JMX enabled by default
Using config: conf/zoo.cfg
Starting zookeeper ... STARTED
$KAFKA_HOME > bin/kafka-server-start.sh config/server.properties
...
[2022-04-25 11:19:17,742] INFO Kafka version: 3.1.0 (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-25 11:19:17,743] INFO Kafka commitId: 37edeed0777bacb3 (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-25 11:19:17,743] INFO Kafka startTimeMs: 1650910757737 (org.apache.kafka.common.utils.AppInfoParser)
[2022-04-25 11:19:17,745] INFO [KafkaServer id=0] started (kafka.server.KafkaServer)
좋아, 프로그래밍하자!!!
직원 API
이것은 kafka 메시지를 생성하기 위한 단일
POST
끝점이 있는 간단한 API입니다.https://start.spring.io으로 이동하여 기본 웹 애플리케이션을 생성하십시오.
다운로드를 클릭하고
demo.zip
파일을 작업공간으로 확장합니다.시작하려면 Kafka에 게시할 몇 가지 종속 항목이 필요합니다.
pom.xml
파일에 다음 종속성을 추가합니다.<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.4</version>
</dependency>
다음으로 엔드포인트를 제공할 컨트롤러를 생성합니다.
package com.example.demo;
import com.example.demo.model.Employee;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.net.URI;
import java.util.UUID;
@RequestMapping("/employee")
@RestController
public class EmployeeController {
@Autowired
protected KafkaTemplate<String, String> kafka;
public ResponseEntity<Void> createEmployee(@RequestParam("firstname") String firstname,
@RequestParam("lastname") String lastname) {
String id = UUID.randomUUID().toString();
kafka.send("employee", id, String.format("%s, %s", lastname, firstname));
return ResponseEntity.created(URI.create(String.format("/employee/%s", id))).build();
}
}
코드를 빠르게 살펴보겠습니다...
먼저
KafkaTemplate<String, String> template
를 주입합니다. 멋진 작업을 수행하지 않기 때문에 메시지 키를 java.lang.String
로 보내고 메시지 본문도 java.lang.String
유형이 됩니다.우리
@PostMapping
는 단순히 2개의 쿼리 매개변수가 있는 엔드포인트가 될 것입니다.따라서 URL의 형식은 다음과 같습니다.
http://localhost:8050/employee?firstname=Paula&lastname=Abdul
API를 호출할 때 다음 구문과 함께 Httpie를 사용합니다.
http POST http://localhost:8070/employee firstname==Paula lastname==Abdul
다음으로 우리는 임의의 UUID를 메시지 ID로 생성하고 이를 성 및 이름의 연결된 문자열과 함께
employee
주제로 보냅니다.String id = UUID.randomUUID().toString();
kafka.send("employee", id, String.format("%s, %s", lastname, firstname));
애플리케이션을 시작하기 전에 구성을 조정해 보겠습니다.
src/main/resources/application.yml
파일에서 다음과 동일하게 만들어 보겠습니다.spring:
kafka:
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
bootstrap-servers:
- localhost:9092
server:
port: 8070
이제 애플리케이션을 실행하고 엔드포인트에 도달할 수 있습니다.
./mvnw spring-boot:run
...
: Tomcat initialized with port(s): 8070 (http)
: Starting service [Tomcat]
: Starting Servlet engine: [Apache Tomcat/9.0.54]
: Initializing Spring embedded WebApplicationContext
: Root WebApplicationContext: initialization completed in 754 ms
: Tomcat started on port(s): 8070 (http) with context path ''
: Started DemoApplication in 1.522 seconds (JVM running for 1.841)
끝점을 호출합니다.
$ http POST http://localhost:8070/employee firstname==Andrew lastname==Marshall
HTTP/1.1 201
Connection: keep-alive
Content-Length: 0
Date: Wed, 27 Apr 2022 17:30:39 GMT
Keep-Alive: timeout=60
Location: /employee/830da346-38b9-4d5b-a051-a302c395333e
kafka-console-consumer.sh
명령을 사용하여 Kafka에서 메시지를 추적할 수 있습니다.$ $KAFKA_HOME/bin/kafka-console-consumer --topic employee --bootstrap-server localhost:9092 --from-beginning
Marshall, Andrew
다음 기사 Spring Cloud Stream을 사용하여 기능 소비자를 설정합니다...
Reference
이 문제에 관하여(Spring Cloud를 사용한 기능적 Kafka - 1부), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/anthonyikeda/functional-kafka-with-spring-cloud-part-1-2h7h텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)