Apache Kafka ile veri yazma ve okuma

Apache Kafka üzerinde konular (topic) vasıtası ile veriler gönderebilir ve bu verileri okuyabiliriz.

이 프로그램은 python3-kafka 패키지를 사용하여 Python3을 사용하는 이전 버전에서 이전 버전의 프로그램을 실행하는 데 사용됩니다.

Bir önceki yazımızdaki docker konteynırı üzerinde yapabileceğiniz gibi aşağıda belirli ayarları yaparak kendi sunucunuzla da iletime geçebilirsiniz.

Python은 Kafka kütüphanesini yükleme에서



Python 3-kafka paketini kullanacağım을 사용하여 kafka kütüphanesi olmasına rağmen en çok kullanılanlardan birisi olan python3-kafka paketini kullanacağım. Kurulum için aşağıdaki komutun yazılması gerekecektir

sudo apt install python3-kafka


비단뱀은 비단뱀을 죽이지 않고 kafka kütüphanesini kullanabilir olacağız.

우레티치 일 베리 야즈마



"uretici.py"를 사용하면 기본적으로 사용할 수 있습니다. İsmi istediğiniz gibi verebilirsiniz ama kavram olarak üretici (프로듀서) Kafka anlatımlarında kullanıldığı için bu ismi kullandım. Bu dosya içeriği aşağıdaki gibi yapılabilir.

from kafka import KafkaProducer
import json

producer = KafkaProducer(
 bootstrap_servers='localhost:9092',
 value_serializer=lambda v: json.dumps(v).encode('ascii')
)

producer.send(
 'ornekinsanlar',
 value=
    {
     "name": "Ali Yazar",
     "yas": "12",
     "dogumyeri": "Zonguldak",
     "detay": "iyi bayramlar!"
     }
)
producer.flush()



Yazılan uygulamayı incelersek "ornekinsanlar"olarak yazılmış ifadenin Kafka üzerindeki bir konu (topic) olduğunu anlayabiliriz.

Üst kısımda KafkaProducer olarak da hangi sunucu adresine hangi port üzerinden erişilebileceğini sormakta. Docker üzerinde örnek yapıyorsanız aynı şekilde (localhost:9092) bırakmanız yeterli olacaktır.

Ayrıca Kafka'nın SSL desteği bulunduğu için normal şartlarda SSL ayarlarının da yapılması gerektiğini söylemem gerekir. Fakat örnek olması açısından bu ayarları es geçiyoruz.

Üretici uygulamamızın Kafka ile haberleşmesi sonrasında aşağıdaki gibi bir JSON formatının Kafka'ya gönderilmesi sağlanmakta.

    {
     "name": "Ali Yazar",
     "yas": "12",
     "dogumyeri": "Zonguldak",
     "detay": "iyi bayramlar!"
     }


Bu kısımdaki içerik uygulamanızın ihtiyaçlarını anlatmakta. İstediğiniz gibi oluşturabilirsiniz.

베리 야즈마 이슬레미



Dosyayı oluşturdu isek aşağıdaki gibi dosyanın içerisine yazdığımız json verisini Kafka'ya gönderebiliriz. İçeriğini değiştirerek birkaç kere çalıştırdığınızda peş peşe eklendiğini göreceksiniz.

python3 uretici.py


Tüketici 일 베리 오쿠마



Yazdığınız verilerin okunması kısmı da oldukça basit olmaktadır.

"tuketici.py"는 기본적으로 다음 작업을 수행할 때 사용할 수 있습니다. İsmi istediğiniz gibi verebilirsiniz ama kavram olarak tüketici (소비자) Kafka anlatımlarında kullanıldığı için bu ismi kullandım. Bu dosya içeriği aşağıdaki gibi yapılabilir.

from kafka import KafkaConsumer
from pprint import pprint

if __name__ == '__main__':
    consumer = KafkaConsumer('ornekinsanlar', bootstrap_servers="localhost:9092",
                             enable_auto_commit=False, auto_offset_reset="earliest")
    pprint(consumer.metrics())
    for msg in consumer:
        pprint(msg)


Yazılan uygulamayı incelersek "ornekinsanlar"olarak yazılmış ifadenin Kafka üzerindeki bir konu (topic) olduğunu anlayabiliriz.

Benzer şekilde Kafka생산자 olarak da hangi sunucu adresine hangi port üzerinden erişilebileceğini sormakta. Docker üzerinde örnek yapıyorsanız aynı şekilde (localhost:9092) bırakmanız yeterli olacaktır.

Kafka는 "consumer.metrics()"fonksiyonunu kullanıyoruz를 사용하여 계산된 값을 계산할 수 있습니다.

Daha sonrasında ise çektiğimiz konu (주제) içerisindeki verilerin for döngüsü içerisinde verilerinin tamamının çekileceğini görebiliriz.

python3 tuketici.py


Uygulamanın çıktısı aşağıdaki gibi olacaktır.

ConsumerRecord(topic='ornekinsanlar', partition=0, offset=0, timestamp=1657528880472, timestamp_type=0, key=None, value=b'{"name": "Ali Bir", "yas": "21", "dogumyeri": "Ankara", "detay": "iyi bayramlar!"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=87, serialized_header_size=-1)
ConsumerRecord(topic='ornekinsanlar', partition=0, offset=1, timestamp=1657529032891, timestamp_type=0, key=None, value=b'{"name": "Ali Yazar", "yas": "12", "dogumyeri": "Zonguldak", "detay": "iyi bayramlar!"}', headers=[], checksum=None, serialized_key_size=-1, serialized_value_size=87, serialized_header_size=-1)



Nam et ipsa scientia potestas est

좋은 웹페이지 즐겨찾기