Redis Streams가 움직이고 있습니다 - 2부 (트위터 흐르는 미디어 API에서 소비하는 Rust 응용 프로그램)

이 일련의 블로그 글에 오신 것을 환영합니다. 실용적인 예를 통해 소개합니다Redis Streams.트위터 데이터를 실시간으로 검색하고 조회하는 예시 프로그램을 사용할 것입니다.RediSearchRedis Streams는 이 해결 방안의 주간이다. 이 해결 방안은 몇 개의 합작 구성 요소로 구성되어 있으며 우리는 전문적인 블로그 글에서 각 구성 요소를 소개할 것이다.

The code is available in this GitHub repo - https://github.com/abhirockzz/redis-streams-in-action


이 부분에서 트위터 흐름 API와 상호작용하는 서비스를 연구하고, 이 서비스는 트윗을 사용해 처리 파이프의 다음 부분으로 옮긴다.

우리의 최종 목표는 트윗을 처리하고 RediSearch를 통해 검색과 조회에 사용하는 것이다.추문을 소비하고 RediSearch에 직접 저장할 수 있는'무소불능'서비스를 만들 수 있다.그러나 추문을 처리하는 수량을 확대하기 위해서는 버퍼링 구역을 충당하고 생산자(본 블로그에서 중점적으로 소개할 응용 프로그램)와 소비자(다음 블로그에서 소개)를 결합시키는 서비스가 필요하다.
이것이 바로 우리의 첫 번째 구성 요소가 추진한 것이다. 트위터 데이터를 사용하여 Redis Streams에 전송하는 것이다.우리는 그것을 Azure Container Instances에 배치하여 그것의 기능을 검증하고 코드와 어떻게 함께 작동하는지 이해할 것이다.

As you will see in the later parts of this series, this also provides a foundation for scale-out architecture.


요컨대, 이 박문은 간단명료하다!그것은 해결 방안의 다른 부분에 기반을 다져 후속 글에서 소개할 것이다.걱정하지 마십시오. 서비스는 녹으로 쓰여 있습니다.논리는 당신이 가장 좋아하는 프로그래밍 언어에 쉽게 이식될 수 있다.

예비 지식


만약 네가 아직 free Azure accountinstall the Azure CLI가 없다면, 시작해라.
Google은 일반적인 Docker CLI 명령을 사용하여 Twitters 소비자 응용 프로그램을 Azure Container Instances 에 배치할 것입니다.이 기능은 integration between Docker and Azure에서 사용할 수 있습니다.Docker Desktop 2.3.0.5 이상의 버전이 Windows, macOS 또는 설치Docker ACI Integration CLI for Linux에 적용되는지 확인하십시오.
Twitter 스트리밍 API를 사용하려면 Twitter 개발자 계정이 필요합니다.만약 당신이 아직 없다면 어떻게 설정하는지 설명해 주십시오.

Azure 컨테이너 인스턴스에 적용 배포


우선 Redis를 위해 Azure Cache를 설치한 기업층using this quickstart.이 단계를 완료하면 Redis 호스트 이름 및 액세스 키 정보를 저장해야 합니다.

트위터 소비자 응용 프로그램은 Docker container 형식으로 제공된다. 가장 간단한 방법은 간단하게 반복해서 사용하는 것이다.자신의 이미지를 만들고 싶다면 사용하세요 Dockerfile available in the GitHub repo.
Azure 컨테이너에 배치하는 실례가 얼마나 편리한지 볼 수 있습니다. 이것은 서버가 없는 Azure 환경에서 Docker 컨테이너를 필요에 따라 실행할 수 있도록 합니다.
먼저 create an Azure context 컨테이너 인스턴스를 만들고 관리할 수 있도록 Azure 구독 및 리소스 그룹과 Docker를 연결합니다.
docker login azure
docker context create aci aci-context
docker context use aci-context
환경 변수 설정 - 계정에 따라 Redis 호스트 및 자격 증명이 업데이트되었는지 확인합니다.
export REDIS_HOSTNAME=<redis host port e.g. my-redis-host:10000>
export IS_TLS=true
export REDIS_PASSWORD=<redis access key (password)>

# don't forget your twitter api credentials
export TWITTER_API_KEY=<api key>
export TWITTER_API_KEY_SECRET=<api key secret>
export TWITTER_ACCESS_TOKEN=<access token>
export TWITTER_ACCESS_TOKEN_SECRET=<access token secret>
구버전docker run만 실행하면 됩니다.
docker run -d --name redis-streams-producer \
-e REDIS_HOSTNAME=$REDIS_HOSTNAME \
-e IS_TLS=$IS_TLS \
-e REDIS_PASSWORD=$REDIS_PASSWORD \
-e TWITTER_API_KEY=$TWITTER_API_KEY \
-e TWITTER_API_KEY_SECRET=$TWITTER_API_KEY_SECRET \
-e TWITTER_ACCESS_TOKEN=$TWITTER_ACCESS_TOKEN \
-e TWITTER_ACCESS_TOKEN_SECRET=$TWITTER_ACCESS_TOKEN_SECRET \
abhirockzz/tweets-redis-streams-producer-rust
이제 Azure에 컨테이너를 만들어야 합니다. 다음과 같은 출력을 볼 수 있습니다.
[+] Running 2/2
 ⠿ Group redis-streams-producer  Created                                                                             4.2s
 ⠿ redis-streams-producer        Created                                                                            15.8s
Azure 포털을 사용하여 이 작업을 확인합니다.

컨테이너 로그를 보려면 다음과 같이 하십시오.
docker logs redis-streams-producer

그럼, 그것은 효과가 있습니까?


그래, 그래야지!확인하려면 redis-cli를 사용하여 Redis 인스턴스에 연결합니다.
redis-cli -h <redis cache host> -p <redis port> -a <access key> --tls
... Redis Streams를 실행하려면 XRANGE 명령을 실행하십시오.
XRANGE tweets_stream - + COUNT 5
이것은 앞의 다섯 개의 추문을 되돌려줄 것이다.필요에 따라 변경할 수 있습니다COUNT.

The - and + special IDs mean respectively the minimum ID possible and the maximum ID possible inside the stream


이것이 바로 저희 프로그램이 트윗을 소비하고 Redis Streams에 추가할 수 있는지 확인해야 하는 내용입니다.앞에서 말한 바와 같이, 우리 해결 방안의 다른 구성 요소는 이 기초 위에 세워질 것이다.
응용 프로그램을 일시 중지하거나 삭제할 수 있습니다.
#to pause
docker stop redis-streams-producer

#to delete
docker rm redis-streams-producer
이제 애플리케이션이 어떻게 작동하는지 빠르게 살펴보겠습니다.만약 당신이 녹슨 코드를 탐색하는 것에 관심이 있다면, 그것은 매우 유용하다는 것을 발견할 수 있을 것이다.

코드 스트리밍


You can refer to the code here


이 응용 프로그램은 다음 라이브러리를 사용합니다.
  • 액세스용tokio
  • Rust 라이브러리

  • redis-rs, 고급 및 하위 API
  • 를 포함하는 Redis용 Rust 라이브러리

  • serdeserde json
  • 먼저 Redis와 Twitter를 연결합니다.
    fn connect_redis() -> redis::Connection {
        println!("Connecting to Redis");
        let redis_host_name =
            env::var("REDIS_HOSTNAME").expect("missing environment variable REDIS_HOSTNAME");
        let redis_password = env::var("REDIS_PASSWORD").unwrap_or_default();
    
        //if Redis server needs secure connection
        let uri_scheme = match env::var("IS_TLS") {
            Ok(_) => "rediss",
            Err(_) => "redis",
        };
    
        let redis_conn_url = format!("{}://:{}@{}", uri_scheme, redis_password, redis_host_name);
        println!("redis_conn_url {}", redis_conn_url);
    
        let client = redis::Client::open(redis_conn_url).expect("check Redis connection URL");
        client.get_connection().expect("failed to connect to Redis")
    }
    
    특정 키워드 또는 사용자 그룹을 따르지 않고 에 연결하기만 하면 약 1%의 트윗을 실시간으로 액세스할 수 있습니다.
        let token = twitter_token();
    
        TwitterStream::sample(&token)
            .try_flatten_stream()
            .try_for_each(|json| {
                let msg: model::StreamMessage =
                    serde_json::from_str(&json).expect("failed to convert tweet JSON to struct");
                process(msg, c.clone());
                future::ok(())
            })
            .await
            .expect("error connecting to Twitter stream!");
    
    대부분의 논리는 process 함수에 봉인되어 있다.우리 조금씩 한 번 봅시다.twitter-stream crate는 각 트윗을 원래 JSON 형식으로 반환합니다.이것은 model::StreamMessage로 변환되었는데 이것은 우리가 원시 추문에서 추출하고자 하는 데이터 모델링에 근거한 구조이다.
    우리는 serde_json를 사용하여 이 작업을 완성했다.
    serde_json::from_str(&json).expect("json to struct conversion failed");
    
    그리고 그것을 redis::Connection와 함께 process 함수에 전달한다.
    let conn = connect_redis();
    let c = Arc::new(Mutex::new(conn));
    ...
    fn process(msg: model::StreamMessage, conn: Arc<Mutex<redis::Connection>>) {
        //omitted
    }
    
    그런데 왜 그것을 ArcMutex 사이에 포장해야 합니까?redis::ConnectionFnMut에 전달해야 하기 때문이다.그것moves은 연결이기 때문에 우리는 Arc이 제공하는 공유 인용을 사용해야 한다.그러나 Arc는 부족하다. 왜냐하면 우리는 변이 데이터를 허용하지 않기 때문이다.따라서, 우리는 Mutex를 사용하여 연결 대상을 잠금합니다. Rust 컴파일러는 한 번에 한 라인만 접근할 수 있다는 것을 확신할 수 있습니다. (변하지 않음)
    가공 부분은 상대적으로 간단하다.이 모든 것은 xadd_map 함수를 사용하여 추문을 Redis 흐름에 추가하는 것에 관한 것이다.이것은 BTreeMap 을 받아들여서 model::StreamMessage 의 정보에 따라 트위터 텍스트, 트위터 사용자 (화면) 이름, ID, 위치, 탭 (있으면) 을 만듭니다.최종적으로, 우리의 목표는 RediSearch에서 그것들을 인덱스하고, 그것들을 유연하게 조회하는 것이다.
    let mut stream_entry: BTreeMap<String, String> = BTreeMap::new();
                    stream_entry.insert("id".to_string(), tweet.id.to_string());
                    stream_entry.insert("user".to_string(), tweet.user.screen_name);
                    stream_entry.insert("text".to_string(), tweet.text);
                    stream_entry.insert("location".to_string(), tweet.user.location);
    
    이 부분은 여기까지입니다.

    다음 거 계속...


    우리 이제 시작이야!이것은 저희 서비스의 첫 번째 구성 요소로 트윗을 처리하고 RediSearch 조회를 통해 트윗을 조회하는 데 기반을 다졌습니다.다가오는 블로그에서 자바 기반 앱을 어떻게 사용해 Redis Streams의 트윗을 소비하고 처리하는지 깊이 있게 연구할 것이다.기대해주세요!

    좋은 웹페이지 즐겨찾기