C 와 자바 가 그렇게 향 기 롭 지 않 습 니 다.Serverless 시대 Rust 가 곧 왕 이 될 것 입 니까?

15701 단어 CJavaServerlessRust
높 은 병발 모드 초기 분석
이 고 병발 시대 에 가장 중요 한 디자인 모델 은 생산자,소비자 모델 이다.예 를 들 어 유명한 정보 대열 인 kafka 는 사실은 생산자 소비자 모델 의 전형 적 인 실현 이다.사실은 생산자 소비자 문제,즉 유한 한 완충 문제 로 다음 과 같은 장면 으로 간략하게 설명 할 수 있다.생산 자 는 일 정량의 제품 을 창고 에 넣 고 이 과정 을 계속 반복 할 수 있다.이와 함께 소비자 들 도 완충 구역 에서 이런 데 이 터 를 소모 하지만 창고 의 크기 가 제한 되 어 있 기 때문에 생산자 와 소비자 간 의 보 조 는 조 화 롭 고 생산 자 는 창고 가 가득 찬 상황 에 포트 에 넣 지 않 으 며 소비자 들 도 창고 가 비 었 을 때 데 이 터 를 소모 하지 않 는 다.아래 그림 참조:

만약 에 생산자 와 소비자 사이 에서 완벽 하 게 조 화 를 이 루 고 효율 을 유지한다 면 이것 이 바로 높 은 병발 이 해결 해 야 할 본질 적 인 문제 이다.
C 언어의 높 은 병발 사례
필 자 는 앞에서 TDEngine 의 관련 코드 를 소개 한 적 이 있다.그 중에서 Sheduler 모듈 의 관련 스케줄 링 알고리즘 은 생산,소비자 모델 을 사용 하여 메시지 전달 기능 을 실현 했다.즉,여러 생산자(producer)가 생 성하 고 대열 에 메 시 지 를 계속 전달 하 며 여러 소비자(consumer)가 대열 에서 메 시 지 를 계속 얻 었 다.
그 다음 에 우 리 는 유형 기능 이 Go,자바 등 고급 언어 에서 유사 한 기능 이 이미 봉 인 된 것 을 설명 할 것 이다.그러나 C 언어 에서 너 는 반드시 상호 배척 체(mutex)와 신 호 량(semaphore)을 잘 사용 하고 그들의 관 계 를 조율 해 야 한다.C 언어의 실현 이 가장 복잡 하기 때문에 먼저 구조 체 디자인 과 그의 주석 을 살 펴 보 자.

typedef struct {
  char            label[16];//    
  sem_t           emptySem;//             
  sem_t           fullSem;//             
  pthread_mutex_t queueMutex;//               ,       
  int             fullSlot;//    
  int             emptySlot;//    
  int             queueSize;#    
  int             numOfThreads;//         
  pthread_t *     qthread;//    
  SSchedMsg *     queue;//    
} SSchedQueue;


다시 보면 Shceduler 초기 화 함 수 를 보십시오.여기 서 특별히 설명해 야 할 것 은 두 신 호 량 의 생 성 입 니 다.그 중에서 empty Sem 은 대기 열의 쓰기 가능 한 상태 이 고 초기 화 할 때 그 값 은 queue Size 입 니 다.즉,초기 대기 열 은 쓸 수 있 고 받 아들 일 수 있 는 메시지 길 이 는 대기 열의 읽 을 수 있 는 상태 입 니 다.초기 화 할 때 그 값 은 0 입 니 다.즉,초기 대기 열 은 읽 을 수 없습니다.구체 적 인 코드 와 나의 설명 은 다음 과 같다.

void *taosInitScheduler(int queueSize, int numOfThreads, char *label) {
  pthread_attr_t attr;
  SSchedQueue *  pSched = (SSchedQueue *)malloc(sizeof(SSchedQueue));
  memset(pSched, 0, sizeof(SSchedQueue));
  pSched->queueSize = queueSize;
  pSched->numOfThreads = numOfThreads;
  strcpy(pSched->label, label);
  if (pthread_mutex_init(&pSched->queueMutex, NULL) < 0) {
    pError("init %s:queueMutex failed, reason:%s", pSched->label, strerror(errno));
    goto _error;
  }
   //emptySem        ,       queueSize,        ,            。
  if (sem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) {
    pError("init %s:empty semaphore failed, reason:%s", pSched->label, strerror(errno));
    goto _error;
  }
 //fullSem        ,       0,         
  if (sem_init(&pSched->fullSem, 0, 0) != 0) {
    pError("init %s:full semaphore failed, reason:%s", pSched->label, strerror(errno));
    goto _error;
  }
  if ((pSched->queue = (SSchedMsg *)malloc((size_t)pSched->queueSize * sizeof(SSchedMsg))) == NULL) {
    pError("%s: no enough memory for queue, reason:%s", pSched->label, strerror(errno));
    goto _error;
  }
  memset(pSched->queue, 0, (size_t)pSched->queueSize * sizeof(SSchedMsg));
  pSched->fullSlot = 0;//        ,           0
  pSched->emptySlot = 0;//        ,           0
  pSched->qthread = malloc(sizeof(pthread_t) * (size_t)pSched->numOfThreads);
  pthread_attr_init(&attr);
  pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
  for (int i = 0; i < pSched->numOfThreads; ++i) {
    if (pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched) != 0) {
      pError("%s: failed to create rpc thread, reason:%s", pSched->label, strerror(errno));
      goto _error;
    }
  }
  pTrace("%s scheduler is initialized, numOfThreads:%d", pSched->label, pSched->numOfThreads);
  return (void *)pSched;
_error:
  taosCleanUpScheduler(pSched);
  return NULL;
}
메 시 지 를 읽 는 taosProcessSched Queue 함 수 를 보 겠 습 니 다.이것 은 사실 소비자 측의 실현 입 니 다.이 함수 의 주요 논 리 는?
1.무한 순환 사용,대기 열 만 읽 으 면 semwait(&pSched->fullSem)가 막 히 지 않 으 면 계속 아래로 처리 합 니 다.
2.msg 를 조작 하기 전에 상호 배척 체 를 넣 어 msg 가 오용 되 는 것 을 방지 합 니 다.
3.읽 기 작업 이 끝 난 후에 fullSlot 의 값 을 수정 합 니 다.fullSlot 가 넘 치지 않도록 quueSize 에 남 겨 두 어야 합 니 다.동시에 상호 배척 체 에서 탈퇴 하 다.
4.empty Sem 에 post 작업 을 진행 합 니 다.empty Sem 의 값 을 1 로 추가 합 니 다.예 를 들 어 empty Sem 의 원래 값 은 5 입 니 다.메 시 지 를 읽 은 후 empty Sem 의 값 은 6 입 니 다.상 태 를 쓸 수 있 고 받 아들 일 수 있 는 메시지 의 수량 은 6 입 니 다.
구체 적 인 코드 와 설명 은 다음 과 같다.

void *taosProcessSchedQueue(void *param) {
  SSchedMsg    msg;
  SSchedQueue *pSched = (SSchedQueue *)param;
 //          ,       sem_wait(&pSched->fullSem)         
  while (1) {
    if (sem_wait(&pSched->fullSem) != 0) {
      pError("wait %s fullSem failed, errno:%d, reason:%s", pSched->label, errno, strerror(errno));
      if (errno == EINTR) {
        /* sem_wait is interrupted by interrupt, ignore and continue */
        continue;
      }
    }
     //       msg   。
    if (pthread_mutex_lock(&pSched->queueMutex) != 0)
      pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
    msg = pSched->queue[pSched->fullSlot];
    memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg));
    //      fullSlot  ,      fullSlot  ,    queueSize  。
    pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize;
     //           
    if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
      pError("unlock %s queueMutex failed, reason:%s
", pSched->label, strerror(errno)); // emptySem post , emptySem 1, emptySem 5, ,emptySem 6, , 6 if (sem_post(&pSched->emptySem) != 0) pError("post %s emptySem failed, reason:%s
", pSched->label, strerror(errno)); if (msg.fp) (*(msg.fp))(&msg); else if (msg.tfp) (*(msg.tfp))(msg.ahandle, msg.thandle); } }
마지막 으로 메 시 지 를 쓰 는 taos ScheduleTask 함수 즉 생산의 실현 입 니 다.그 기본 논 리 는?
1.대기 열 을 쓰기 전에 empty Sem 을 1 로 줄 입 니 다.empty Sem 의 원래 값 이 1 이면 1 을 줄 인 후 0,즉 대기 열 이 가득 찼 습 니 다.메 시 지 를 읽 은 후 empty Sem 이 post 작업 을 한 후에 야 대기 열 이 쓸 수 있 습 니 다.
2.상호 배척 체 를 추가 하여 msg 가 오 작 동 하 는 것 을 방지 하고 기록 이 완료 되면 상호 배척 체 를 종료 합 니 다.
3.쓰기 대기 열 이 완 료 된 후 fullSem 에 1 을 추가 합 니 다.예 를 들 어 fullSem 의 원래 값 이 0 이면 1 을 추가 합 니 다.즉,대기 열 은 읽 을 수 있 습 니 다.우리 가 위 에서 소개 한 읽 기 taosProcessSched Queue 중 semwait(&pSched->fullSem)가 막 히 지 않 으 면 계속 아래로 내 려 갑 니 다.

int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) {
  SSchedQueue *pSched = (SSchedQueue *)qhandle;
  if (pSched == NULL) {
    pError("sched is not ready, msg:%p is dropped", pMsg);
    return 0;
  }
  //       emptySem   1  , emptySem   1,   1  0,       ,        , emptySem  post   ,          。
  if (sem_wait(&pSched->emptySem) != 0) pError("wait %s emptySem failed, reason:%s", pSched->label, strerror(errno));
//       msg    
  if (pthread_mutex_lock(&pSched->queueMutex) != 0)
    pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
  pSched->queue[pSched->emptySlot] = *pMsg;
  pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize;
  if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
    pError("unlock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
  //       fullSem   1  , fullSem   0,   1  1,       ,                 。
  if (sem_post(&pSched->fullSem) != 0) pError("post %s fullSem failed, reason:%s", pSched->label, strerror(errno));
  return 0;
}
자바 의 높 은 병행 실현
동시 다발 모델 을 보면 Go 와 Rust 는 모두 channel 이라는 개념 을 가지 고 채널 을 통 해 선(협)간 의 동기 화 를 실현 한다.channel 은 읽 기와 쓰기 상 태 를 가지 고 데이터 순 서 를 확보 하 며 channel 의 패 키 징 정도 와 효율 이 현저히 높 기 때문에 Go 와 Rust 는 공식 적 으로 channel(통신)을 사용 하여 메모 리 를 공유 하 는 것 을 권장 한다.공유 메모리 로 통신 하 는 것 이 아니 라
여러분 이 차 이 를 찾 을 수 있 도록 자바 를 예 로 들 어 channel 이 없 는 고급 언어 자바,생산자 소비자 들 이 어떻게 실현 해 야 하 는 지,코드 와 주석 은 다음 과 같 습 니 다.

public class Storage {
 
    //        
    private final int MAX_SIZE = 10;
    //        
    private LinkedList<Object> list = new LinkedList<Object>();
    //  
    private final Lock lock = new ReentrantLock();
    //        
    private final Condition full = lock.newCondition();
    //        
    private final Condition empty = lock.newCondition();
 
    public void produce()
    {
        //    
        lock.lock();
        while (list.size() + 1 > MAX_SIZE) {
            System.out.println("【   " + Thread.currentThread().getName()
		             + "】    ");
            try {
                full.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        list.add(new Object());
        System.out.println("【   " + Thread.currentThread().getName() 
				 + "】      ,   " + list.size());
 
        empty.signalAll();
        lock.unlock();
    }
 
    public void consume()
    {
        //    
        lock.lock();
        while (list.size() == 0) {
            System.out.println("【   " + Thread.currentThread().getName()
		             + "】    ");
            try {
                empty.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        list.remove();
        System.out.println("【   " + Thread.currentThread().getName()
		         + "】      ,   " + list.size());
 
        full.signalAll();
        lock.unlock();
    }
}

자바,C\#이러한 대상 을 대상 으로 하지만 channel 언어 가 없 으 면 생산자,소비자 모델 은 적어도 하나의 lock 과 두 개의 신 호 량 을 통 해 공동으로 완성 해 야 한다.그 중에서 자물쇠 의 역할 은 같은 시간 을 확보 하 는 것 이다.창고 에 한 명의 사용자 만 데 이 터 를 수정 하고 창고 가 가득 찬 신 호 량 을 표시 해 야 한다.창고 가 가득 찬 상황 에 이 르 면 이 신 호 량 을 차단 상태 로 설정 하여 다른 생산자 가 창고 에 상품 을 운반 하 는 것 을 막 아야 한다.반면에 창고 가 비어 있 는 신 호 량 도 마찬가지다.창고 가 비어 있 으 면다른 소비자 들 이 다시 소비 하 는 것 도 막 아야 한다.
Go 의 높 은 병행 실현
우 리 는 방금 Go 언어 에서 공식 적 으로 channel 을 사용 하여 협 정 간 통신 을 실현 하 는 것 을 추 천 했 기 때문에 lock 과 신 호 량 을 추가 하지 않 아 도 모델 을 실현 할 수 있 습 니 다.다음 코드 에서 우 리 는 서브 goroutine 을 통 해 생산자 의 기능 을 완 성 했 고 다른 키 goroutine 에서 소비자 의 기능 을 실 현 했 습 니 다.메 인 goroutine 을 막 아 서브 goroutine 이 실 행 될 수 있 도록 해 야 합 니 다.이로써 생산자 소비자 모델 을 쉽게 완성 했다.구체 적 인 실천 을 통 해 생산자 소비자 모델 의 실현 을 살 펴 보 자.

package main

import (
	"fmt"
	"time"
)

func Product(ch chan<- int) { //   
	for i := 0; i < 3; i++ {
		fmt.Println("Product  produceed", i)
		ch <- i //  channel goroutine   ,               lock  .
	}
}
func Consumer(ch <-chan int) {
	for i := 0; i < 3; i++ {
		j := <-ch //  channel goroutine   ,               lock  .
		fmt.Println("Consmuer consumed ", j)
	}
}
func main() {
	ch := make(chan int)
	go Product(ch)//             goroutine 
	go Consumer(ch)//             goroutine 
	time.Sleep(time.Second * 1)//   goroutine  
	/*        ,   
		Product  produceed 0
	Product  produceed 1
	Consmuer consumed  0
	Consmuer consumed  1
	Product  produceed 2
	Consmuer consumed  2
	*/

}
자바 에 비해 GO 를 사용 하여 실현 하고 헤어스타일 을 하 는 생산자 소비자 모델 이 더욱 상쾌 해 지 는 것 을 볼 수 있다.
Rust 의 높 은 병발 실현
어 쩔 수 없 이 Rust 의 난이도 가 너무 높 습 니 다.필자 가 예전 에 어 셈 블 리,C,자바 등 분야 의 경험 을 통 해 저 는 Go 언어 를 신속하게 파악 하 는 데 도움 을 줄 수 있 지만.하지만 방학 동안 러 스 트 를 이틀 동안 보고 작별 인 사 를 하고 싶 었 다.니 마 는 너무 권고 했다.Rust 가 공식 적 으로 제공 하 는 기능 에서 사실은 다 생산자,다 소비자 의 channel 을 포함 하지 않 습 니 다.std:sync 공간 에서 다 생산자 단일 소비자(mpsc)의 channel 만 있 습 니 다.그 사례 는 다음 과 같다.

use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
    let (tx, rx) = mpsc::channel();
    let tx1 = mpsc::Sender::clone(&tx);
    let tx2 = mpsc::Sender::clone(&tx);
    thread::spawn(move || {
        let vals = vec![
            String::from("1"),
            String::from("3"),
            String::from("5"),
            String::from("7"),
        ];
        for val in vals {
            tx1.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    thread::spawn(move || {
        let vals = vec![
            String::from("11"),
            String::from("13"),
            String::from("15"),
            String::from("17"),
        ];
        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    thread::spawn(move || {
        let vals = vec![
            String::from("21"),
            String::from("23"),
            String::from("25"),
            String::from("27"),
        ];
        for val in vals {
            tx2.send(val).unwrap();
            thread::sleep(Duration::from_secs(1));
        }
    });
    for rec in rx {
        println!("Got: {}", rec);
    }
}
Rust 에서 생산자 소비 자 를 실현 하 는 것 은 어렵 지 않다 는 것 을 알 수 있 지만 생산 자 는 여러 개 를 복제 할 수 있 지만 소비 자 는 한 가지 만 있 을 수 있다.그 이 유 는 Rust 에서 GC 즉 쓰레기 회수 기능 이 없 기 때문에 안전 한 Rust 를 확보 하려 면 사용권 제한 변경 에 대해 엄격 한 관 리 를 해 야 하기 때문이다.Rust 에서 move 키 워드 를 사용 하여 변 경 된 소유권 이전 을 하지만 Rust 가 생산 주 기 를 변경 하 는 관리 규정 에 따라 스 레 드 간 권한 이전 의 소유권 수신 자 는 같은 시간 에 한 가지 만 있 을 수 있 습 니 다.이것 도 Rust 정부 가 MPSC 만 제공 하 는 이유 입 니 다.

use std::thread;
fn main() {
    let s = "hello";
    let handle = thread::spawn(move || {
        println!("{}", s);
    });
    handle.join().unwrap();
}
물론 Rust 의 다음 API 는 join 입 니 다.그 는 모든 하위 스 레 드 가 끝 난 후에 메 인 스 레 드 를 종료 할 수 있 습 니 다.이것 은 Go 에서 손 으로 차단 하 는 것 보다 어느 정도 향상 되 어야 합 니 다.만약 에 다 생산자,다 소비자 의 기능 을 사용 하려 면 crossbeam 모듈 을 구 해 야 한다.이 모듈 은 파악 하기 도 어렵 지 않다.
총결산
위의 비 교 를 통 해 우 리 는 표 한 장 으로 몇 가지 주류 언어의 상황 대 비 를 설명 할 수 있다.
언어.
안전성
운행 속도
프로 세 스 시작 속도
학습 난이도
C
낮다
매우 빠르다
매우 빠르다
어렵다
Java
높다
여 간
여 간
여 간
Go
높다
비교적 빠르다
비교적 빠르다
여 간
Rust
높다
매우 빠르다(기본 견갑 C)
매우 빠르다(기본 견갑 C)
극히 곤란 하 다
Rust 가 높 은 안전성,기본 적 인 어깨 비교 C 의 운행 과 작 동 속 도 는 반드시 Serverless 의 시대 에 앞장 설 것 이다.Go 는 기본적으로 그 뒤 를 따 를 수 있다.C 언어 프로그램 에서 피 할 수 없 는 야생 지침,자바 가 상대 적 으로 낮은 운행 과 작 동 속 도 는 함수 식 연산 에 사용 하기에 적합 하지 않 을 것 이다.자바 는 기업 급 개발 시대 에 각종 C\#와 같은 상 대 를 물 리 쳤 다.그러나 구름 시대 에는 예전 의 통 치 력 만큼 강하 지 않 은 것 같 습 니 다.당신 을 이 긴 것 은 당신 의 상대 가 아니 라 다른 공간의 강 차원 타격 이 라 고 할 수 있 습 니 다.
이 글 의 내용 은 여기까지 입 니 다.당신 에 게 도움 을 줄 수 있 기 를 바 랍 니 다.또한 당신 이 우리 의 더 많은 내용 에 관심 을 가 져 주 셨 으 면 좋 겠 습 니 다!

좋은 웹페이지 즐겨찾기