Oracle 스트리밍 서비스를 관리형 Apache Kafka로 사용

11139 단어 paasoraclekafka
Oracle Cloud Infrastructure Streaming은 대용량 데이터 흐름을 실시간으로 처리하는 완벽한 관리 및 확장 가능한 서비스입니다.
우리는 이를 Apache Kafka의 등가물로 볼 수 있다. 실제로는 API가 호환된다. 이것은 우리가 Kafka를 위해 작성한 응용 프로그램을 사용하여 코드를 다시 쓸 필요가 없다는 것을 의미한다.

문제 진술


왜 우리는 흐르는 미디어 서비스를 사용하는 것을 고려합니까?같은 이유로 우리는 카프카 사용을 고려할 것이다.
추가 이점은 PaaS 서비스이기 때문에 설정이 매우 간단하고 유지 보수가 거의 필요 없다는 것이다. (패치, 확장, HA 구성 잊어버리거나 디스크 공간을 소모하는 것)

어떻게 그것을 사용합니까


구성은 클라우드 콘솔에서 수동으로 또는 RESTAPI/SDK/Terraform/CLI를 사용하여 자동으로 수행됩니다.
발표/소비 소식이 관련되었을 때, 일은 갈수록 재미있어졌다.두 가지 방법이 있습니다.
  • OCI REST API
  • 카프카 호환 API
  • 일반적으로 나는 후자를 사용하는 경향이 있는데, 원인 중 하나는 카프카 프로토콜이 사용하는 TCP 연결 수명이 길기 때문이다.이런 메커니즘을 사용하면 데이터를 읽는 과정을 더욱 상호작용적으로 할 수 있다. 사람들은 일정한 시간을 들여 주제를 들을 수 있고, 메시지가 도착할 때 바로 소모된다.OCI REST API의 경우, 유일한 방법은 순환 중에 GetMessages 작업을 호출하는 것입니다. 사용할 것이 없을 때 바로 종료됩니다.
    다음 절에서는 Kafka 클라이언트를 사용하여 흐르는 미디어를 설정하고 흐르는 미디어에 연결하는 것이 얼마나 쉬운지 보여 드리겠습니다.

    설치 프로그램


    플로팅 탱크


    먼저, 흐름을 그룹화하는 플로팅 탱크를 만듭니다. (가장 간단한 방법은 흐름을 카프카 테마로 보는 것입니다.)

    제공되어야 하는 유일한 필수 매개 변수는 실제로는 플로팅 이름입니다.다른 설정에 대해서는 기본값을 사용할 수 있습니다.
    그러나 다음과 같은 두 가지 흥미로운 구성 매개 변수가 있습니다.
  • 노드 유형 - Oracle Cloud Region에서만 액세스할 수 있는 공통 흐름(인터넷에서 트래픽을 액세스할 수 있음)인지 개인 흐름인지 선택할 수 있습니다.
  • 암호화 설정 - 기본적으로 모든 메시지를 암호화합니다.우리는 설정에 대한 상세한 정보를 지정할 수 있다.
  • 흐르다


    이제 흐름 (주제라고도 함) 을 만들 수 있습니다.

    우선, 우리는 흐르는 물을 흐르는 탱크에 분배해야 한다.그리고 우리는 흐름의 이름을 제공해야 한다.
    우리는 섹션 메커니즘을 사용하여 읽기/쓰기 처리량을 제어할 수 있다(Kafka의 작업 원리와 같다).

    카프카 설정


    주어진 흐름 탱크의 상세한 정보를 포함하는 페이지를 열면 화면 상단에서 다음 단추를 찾을 수 있습니다.

    를 클릭하면 연결 세부 정보가 포함된 페이지가 표시됩니다.

    이 모든 데이터가 있으면 우리는 거의 Kafka 클라이언트를 사용하여 흐르는 미디어 서비스에 연결할 수 있다.그러나 우선 우리는 매우 중요한 화제인 안전에 대해 이야기해야 한다.

    안전에 관한 몇 마디 중요한 말


    암호화


    클라이언트와 서버 간의 연결은 전송 과정에서 TSL(SSL의 상속자)을 사용하여 암호화됩니다.기본적으로 단방향 인증을 사용합니다. 이것은 클라이언트가 서버 인증서를 인증하는 것을 의미합니다.
    인증 작업을 위해 클라이언트는 서버가 제공한 인증서를 신뢰해야 합니다.일반적으로 추가 구성이 필요합니다.
    클라이언트는 "ssl.ca.location"설정을 사용할 수 있습니다.이것은 적당한 신뢰 인증서 체인을 가진 파일을 가리키는 데 사용할 수 있다.파일 자체는 다음과 같습니다.
    # Server certificate
    -----BEGIN CERTIFICATE-----
    MIIFaDCCBFCgAwIBAgISESHkvZFwK9Qz0KsXD3x8p44aMA0GCSqGSIb3DQEBCwUA
    VQQDDBcqLmF3cy10ZXN0LnByb2dyZXNzLmNvbTCCASIwDQYJKoZIhvcNAQEBBQAD
    ggEPADCCAQoCggEBAMGPTyynn77hqcYnjWsMwOZDzdhVFY93s2OJntMbuKTHn39B
    ...
    bml6YXRpb252YWxzaGEyZzIuY3JsMIGgBggrBgEFBQcBAQSBkzCBkDBNBggrBgEF
    BQcwAoZBaHR0cDovL3NlY3VyZS5nbG9iYWxzaWduLmNvbS9jYWNlcnQvZ3Nvcmdh
    bml6YXRpb252YWxzaGEyZzJyMS5jcnQwPwYIKwYBBQUHMAGGM2h0dHA6Ly9vY3Nw
    lffygD5IymCSuuDim4qB/9bh7oi37heJ4ObpBIzroPUOthbG4gv/5blW3Dc=
    -----END CERTIFICATE-----
    
    # Trust chain intermediate certificate
    ----------BEGIN CERTIFICATE-----
    MIIEaTCCA1GgAwIBAgILBAAAAAABRE7wQkcwDQYJKoZIhvcNAQELBQAwVzELMAkG
    C33JiJ1Pi/D4nGyMVTXbv/Kz6vvjVudKRtkTIso21ZvBqOOWQ5PyDLzm+ebomchj
    SHh/VzZpGhkdWtHUfcKc1H/hgBKueuqI6lfYygoKOhJJomIZeg0k9zfrtHOSewUj
    ...
    dHBzOi8vd3d3Lmdsb2JhbHNpZ24uY29tL3JlcG9zaXRvcnkvMDMGA1UdHwQsMCow
    KKAmoCSGImh0dHA6Ly9jcmwuZ2xvYmFsc2lnbi5uZXQvcm9vdC5jcmwwPQYIKwYB
    K1pp74P1S8SqtCr4fKGxhZSM9AyHDPSsQPhZSZg=
    ----------END CERTIFICATE-----
    
    # Trust chain root certificate
    ----------BEGIN CERTIFICATE-----
    MIIDdTCCAl2gAwIBAgILBAAAAAABFUtaw5QwDQYJKoZIhvcNAQEFBQAwVzELMAkG
    YWxTaWduIG52LXNhMRAwDgYDVQQLEwdSb290IENBMRswGQYDVQQDExJHbG9iYWxT
    aWduIFJvb3QgQ0EwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDaDuaZ
    ...
    jc6j40+Kfvvxi4Mla+pIH/EqsLmVEQS98GPR4mdmzxzdzxtIK+6NiY6arymAZavp
    38NflNUVyRRBnMRddWQVDf9VMOyGj/8N7yy5Y0b2qvzfvGn9LhJIZJrglfCm7ymP
    HMUfpIBvFSDJ3gyICh3WZlXi/EjJKSZp4A==
    ----------END CERTIFICATE-----
    
    어떻게 신뢰 체인을 구축합니까?우리는 체인의 밑부분에서 시작합니다 (상례의 서버 인증서). 그러나 파일의 방향은 반대입니다.
    서버 인증서를 받으려면 다음 명령을 실행할 수 있습니다.
    echo -n | openssl s_client -connect <endpoint taken from Stream details page>
    
    두 가지 중요한 정보가 반환됩니다.
  • 인증서 체인 섹션에는 인증서가 나열되어 있습니다.이 모든 것은 우리의 서류에 넣어야 한다.내 예에서는 다음과 같이 보입니다. (체인에 세 개의 인증서가 있습니다.)
  • Certificate chain
     0 s:/C=US/ST=California/L=Redwood City/O=Oracle Corporation/OU=Oracle OCI-PROD FRANKFURT/CN=streaming.eu-frankfurt-1.oci.oraclecloud.com
       i:/C=US/O=DigiCert Inc/CN=DigiCert SHA2 Secure Server CA
     1 s:/C=US/O=DigiCert Inc/CN=DigiCert SHA2 Secure Server CA
       i:/C=US/O=DigiCert Inc/OU=www.digicert.com/CN=DigiCert Global Root CA
     2 s:/C=US/O=DigiCert Inc/OU=www.digicert.com/CN=DigiCert Global Root CA
       i:/C=US/O=DigiCert Inc/OU=www.digicert.com/CN=DigiCert Global Root CA
    
  • 서버 인증서 - 인증서 행의 시작/종료 섹션입니다.이것은'인증서 체인'의 인증서 번호'0'입니다. 파일의 맨 위에 두어야 합니다.
  • 마지막 단계는 나머지 인증서 (1 및 2) 를 수집하여 파일에 넣는 것입니다.루트 인증서는 아래에 있습니다.
    겸사겸사 한마디 하다.
    테스트 목적으로 우리는 신속하고 더러운 해결 방법을 사용할 수 있다."enable.ssl.certificate.verification"을 "false"로 설정하면 서버 인증이 완전히 비활성화됩니다.

    인증 및 인증


    모든 플로팅 탱크 (또는 수요에 따라 플로팅 탱크) 를 위한 전용 사용자를 만드는 것을 강력히 권장합니다.
    이러한 사용자를 만들려면 다음을 수행해야 합니다.
  • 그를 위해 인증 영패를 생성하는데 이 영패는 비밀번호로 사용할 것이다.
  • OCI 정책을 사용하여 적절한 액세스 수준 부여
  • 카프카 클라이언트


    마지막으로 Kafka 클라이언트를 사용하여 저희 미디어 서비스에 연결을 시도해 보겠습니다.내 경우에는 Go 및Confluent package을 사용합니다.
    우리가 사용할 수 있는 생산자와 소비자 코드의 예는 다음과 같다.
  • Consumer
  • Producer
  • 소비자 구성


    consumer, err := kafka.NewConsumer(&kafka.ConfigMap{
            //taken from Stream Pool details page
            "bootstrap.servers":                   "cell-1.streaming.eu-frankfurt-1.oci.oraclecloud.com:9092",
            //arbitrary value, when using consumer groups
            "group.id":                            "foo",
            //taken from Stream Pool details page
            "sasl.mechanisms":                     "PLAIN",
            //user authorized to read from the  stream(s) 
            "sasl.username":                       "[tenancy]/[user name]/[stream pool id]",
            //Auth Token for the user
            "sasl.password":                       "[token]",
            "enable.ssl.certificate.verification": "true",
            //Full path to the file with certificates that make up chain of trust
            "ssl.ca.location":                     "/dir/ca.pem",
            //taken from Stream Pool details page
            "security.protocol":                   "SASL_SSL",
            "auto.offset.reset":                   "earliest"})  
    
    //subscribe to given Stream (aka topic)
    err = consumer.SubscribeTopics([]string{"testStream"}, nil)
    

    생산자 배치


    p, err := kafka.NewProducer(&kafka.ConfigMap{
            //taken from Stream Pool details page
            "bootstrap.servers":                   "cell-1.streaming.eu-frankfurt-1.oci.oraclecloud.com:9092",
            //taken from Stream Pool details page
            "sasl.mechanisms":                     "PLAIN",
            //user authorized to write to the  stream(s) 
            "sasl.username":                       "[tenancy]/[user name]/[stream pool id]",
            //Auth Token for the user
            "sasl.password":                       "[token]",
            //example of how to disable server certificate validation
            //don't do it in production!
            "enable.ssl.certificate.verification": "false",
            //taken from Stream Pool details page
            "security.protocol":                   "SASL_SSL"})
    

    좋은 웹페이지 즐겨찾기