C 와 자바 가 그렇게 향 기 롭 지 않 습 니 다.Serverless 시대 Rust 가 곧 왕 이 될 것 입 니까?
15701 단어 CJavaServerlessRust
이 고 병발 시대 에 가장 중요 한 디자인 모델 은 생산자,소비자 모델 이다.예 를 들 어 유명한 정보 대열 인 kafka 는 사실은 생산자 소비자 모델 의 전형 적 인 실현 이다.사실은 생산자 소비자 문제,즉 유한 한 완충 문제 로 다음 과 같은 장면 으로 간략하게 설명 할 수 있다.생산 자 는 일 정량의 제품 을 창고 에 넣 고 이 과정 을 계속 반복 할 수 있다.이와 함께 소비자 들 도 완충 구역 에서 이런 데 이 터 를 소모 하지만 창고 의 크기 가 제한 되 어 있 기 때문에 생산자 와 소비자 간 의 보 조 는 조 화 롭 고 생산 자 는 창고 가 가득 찬 상황 에 포트 에 넣 지 않 으 며 소비자 들 도 창고 가 비 었 을 때 데 이 터 를 소모 하지 않 는 다.아래 그림 참조:
만약 에 생산자 와 소비자 사이 에서 완벽 하 게 조 화 를 이 루 고 효율 을 유지한다 면 이것 이 바로 높 은 병발 이 해결 해 야 할 본질 적 인 문제 이다.
C 언어의 높 은 병발 사례
필 자 는 앞에서 TDEngine 의 관련 코드 를 소개 한 적 이 있다.그 중에서 Sheduler 모듈 의 관련 스케줄 링 알고리즘 은 생산,소비자 모델 을 사용 하여 메시지 전달 기능 을 실현 했다.즉,여러 생산자(producer)가 생 성하 고 대열 에 메 시 지 를 계속 전달 하 며 여러 소비자(consumer)가 대열 에서 메 시 지 를 계속 얻 었 다.
그 다음 에 우 리 는 유형 기능 이 Go,자바 등 고급 언어 에서 유사 한 기능 이 이미 봉 인 된 것 을 설명 할 것 이다.그러나 C 언어 에서 너 는 반드시 상호 배척 체(mutex)와 신 호 량(semaphore)을 잘 사용 하고 그들의 관 계 를 조율 해 야 한다.C 언어의 실현 이 가장 복잡 하기 때문에 먼저 구조 체 디자인 과 그의 주석 을 살 펴 보 자.
typedef struct {
char label[16];//
sem_t emptySem;//
sem_t fullSem;//
pthread_mutex_t queueMutex;// ,
int fullSlot;//
int emptySlot;//
int queueSize;#
int numOfThreads;//
pthread_t * qthread;//
SSchedMsg * queue;//
} SSchedQueue;
다시 보면 Shceduler 초기 화 함 수 를 보십시오.여기 서 특별히 설명해 야 할 것 은 두 신 호 량 의 생 성 입 니 다.그 중에서 empty Sem 은 대기 열의 쓰기 가능 한 상태 이 고 초기 화 할 때 그 값 은 queue Size 입 니 다.즉,초기 대기 열 은 쓸 수 있 고 받 아들 일 수 있 는 메시지 길 이 는 대기 열의 읽 을 수 있 는 상태 입 니 다.초기 화 할 때 그 값 은 0 입 니 다.즉,초기 대기 열 은 읽 을 수 없습니다.구체 적 인 코드 와 나의 설명 은 다음 과 같다.
void *taosInitScheduler(int queueSize, int numOfThreads, char *label) {
pthread_attr_t attr;
SSchedQueue * pSched = (SSchedQueue *)malloc(sizeof(SSchedQueue));
memset(pSched, 0, sizeof(SSchedQueue));
pSched->queueSize = queueSize;
pSched->numOfThreads = numOfThreads;
strcpy(pSched->label, label);
if (pthread_mutex_init(&pSched->queueMutex, NULL) < 0) {
pError("init %s:queueMutex failed, reason:%s", pSched->label, strerror(errno));
goto _error;
}
//emptySem , queueSize, , 。
if (sem_init(&pSched->emptySem, 0, (unsigned int)pSched->queueSize) != 0) {
pError("init %s:empty semaphore failed, reason:%s", pSched->label, strerror(errno));
goto _error;
}
//fullSem , 0,
if (sem_init(&pSched->fullSem, 0, 0) != 0) {
pError("init %s:full semaphore failed, reason:%s", pSched->label, strerror(errno));
goto _error;
}
if ((pSched->queue = (SSchedMsg *)malloc((size_t)pSched->queueSize * sizeof(SSchedMsg))) == NULL) {
pError("%s: no enough memory for queue, reason:%s", pSched->label, strerror(errno));
goto _error;
}
memset(pSched->queue, 0, (size_t)pSched->queueSize * sizeof(SSchedMsg));
pSched->fullSlot = 0;// , 0
pSched->emptySlot = 0;// , 0
pSched->qthread = malloc(sizeof(pthread_t) * (size_t)pSched->numOfThreads);
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
for (int i = 0; i < pSched->numOfThreads; ++i) {
if (pthread_create(pSched->qthread + i, &attr, taosProcessSchedQueue, (void *)pSched) != 0) {
pError("%s: failed to create rpc thread, reason:%s", pSched->label, strerror(errno));
goto _error;
}
}
pTrace("%s scheduler is initialized, numOfThreads:%d", pSched->label, pSched->numOfThreads);
return (void *)pSched;
_error:
taosCleanUpScheduler(pSched);
return NULL;
}
메 시 지 를 읽 는 taosProcessSched Queue 함 수 를 보 겠 습 니 다.이것 은 사실 소비자 측의 실현 입 니 다.이 함수 의 주요 논 리 는?1.무한 순환 사용,대기 열 만 읽 으 면 semwait(&pSched->fullSem)가 막 히 지 않 으 면 계속 아래로 처리 합 니 다.
2.msg 를 조작 하기 전에 상호 배척 체 를 넣 어 msg 가 오용 되 는 것 을 방지 합 니 다.
3.읽 기 작업 이 끝 난 후에 fullSlot 의 값 을 수정 합 니 다.fullSlot 가 넘 치지 않도록 quueSize 에 남 겨 두 어야 합 니 다.동시에 상호 배척 체 에서 탈퇴 하 다.
4.empty Sem 에 post 작업 을 진행 합 니 다.empty Sem 의 값 을 1 로 추가 합 니 다.예 를 들 어 empty Sem 의 원래 값 은 5 입 니 다.메 시 지 를 읽 은 후 empty Sem 의 값 은 6 입 니 다.상 태 를 쓸 수 있 고 받 아들 일 수 있 는 메시지 의 수량 은 6 입 니 다.
구체 적 인 코드 와 설명 은 다음 과 같다.
void *taosProcessSchedQueue(void *param) {
SSchedMsg msg;
SSchedQueue *pSched = (SSchedQueue *)param;
// , sem_wait(&pSched->fullSem)
while (1) {
if (sem_wait(&pSched->fullSem) != 0) {
pError("wait %s fullSem failed, errno:%d, reason:%s", pSched->label, errno, strerror(errno));
if (errno == EINTR) {
/* sem_wait is interrupted by interrupt, ignore and continue */
continue;
}
}
// msg 。
if (pthread_mutex_lock(&pSched->queueMutex) != 0)
pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
msg = pSched->queue[pSched->fullSlot];
memset(pSched->queue + pSched->fullSlot, 0, sizeof(SSchedMsg));
// fullSlot , fullSlot , queueSize 。
pSched->fullSlot = (pSched->fullSlot + 1) % pSched->queueSize;
//
if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
pError("unlock %s queueMutex failed, reason:%s
", pSched->label, strerror(errno));
// emptySem post , emptySem 1, emptySem 5, ,emptySem 6, , 6
if (sem_post(&pSched->emptySem) != 0)
pError("post %s emptySem failed, reason:%s
", pSched->label, strerror(errno));
if (msg.fp)
(*(msg.fp))(&msg);
else if (msg.tfp)
(*(msg.tfp))(msg.ahandle, msg.thandle);
}
}
마지막 으로 메 시 지 를 쓰 는 taos ScheduleTask 함수 즉 생산의 실현 입 니 다.그 기본 논 리 는?1.대기 열 을 쓰기 전에 empty Sem 을 1 로 줄 입 니 다.empty Sem 의 원래 값 이 1 이면 1 을 줄 인 후 0,즉 대기 열 이 가득 찼 습 니 다.메 시 지 를 읽 은 후 empty Sem 이 post 작업 을 한 후에 야 대기 열 이 쓸 수 있 습 니 다.
2.상호 배척 체 를 추가 하여 msg 가 오 작 동 하 는 것 을 방지 하고 기록 이 완료 되면 상호 배척 체 를 종료 합 니 다.
3.쓰기 대기 열 이 완 료 된 후 fullSem 에 1 을 추가 합 니 다.예 를 들 어 fullSem 의 원래 값 이 0 이면 1 을 추가 합 니 다.즉,대기 열 은 읽 을 수 있 습 니 다.우리 가 위 에서 소개 한 읽 기 taosProcessSched Queue 중 semwait(&pSched->fullSem)가 막 히 지 않 으 면 계속 아래로 내 려 갑 니 다.
int taosScheduleTask(void *qhandle, SSchedMsg *pMsg) {
SSchedQueue *pSched = (SSchedQueue *)qhandle;
if (pSched == NULL) {
pError("sched is not ready, msg:%p is dropped", pMsg);
return 0;
}
// emptySem 1 , emptySem 1, 1 0, , , emptySem post , 。
if (sem_wait(&pSched->emptySem) != 0) pError("wait %s emptySem failed, reason:%s", pSched->label, strerror(errno));
// msg
if (pthread_mutex_lock(&pSched->queueMutex) != 0)
pError("lock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
pSched->queue[pSched->emptySlot] = *pMsg;
pSched->emptySlot = (pSched->emptySlot + 1) % pSched->queueSize;
if (pthread_mutex_unlock(&pSched->queueMutex) != 0)
pError("unlock %s queueMutex failed, reason:%s", pSched->label, strerror(errno));
// fullSem 1 , fullSem 0, 1 1, , 。
if (sem_post(&pSched->fullSem) != 0) pError("post %s fullSem failed, reason:%s", pSched->label, strerror(errno));
return 0;
}
자바 의 높 은 병행 실현동시 다발 모델 을 보면 Go 와 Rust 는 모두 channel 이라는 개념 을 가지 고 채널 을 통 해 선(협)간 의 동기 화 를 실현 한다.channel 은 읽 기와 쓰기 상 태 를 가지 고 데이터 순 서 를 확보 하 며 channel 의 패 키 징 정도 와 효율 이 현저히 높 기 때문에 Go 와 Rust 는 공식 적 으로 channel(통신)을 사용 하여 메모 리 를 공유 하 는 것 을 권장 한다.공유 메모리 로 통신 하 는 것 이 아니 라
여러분 이 차 이 를 찾 을 수 있 도록 자바 를 예 로 들 어 channel 이 없 는 고급 언어 자바,생산자 소비자 들 이 어떻게 실현 해 야 하 는 지,코드 와 주석 은 다음 과 같 습 니 다.
public class Storage {
//
private final int MAX_SIZE = 10;
//
private LinkedList<Object> list = new LinkedList<Object>();
//
private final Lock lock = new ReentrantLock();
//
private final Condition full = lock.newCondition();
//
private final Condition empty = lock.newCondition();
public void produce()
{
//
lock.lock();
while (list.size() + 1 > MAX_SIZE) {
System.out.println("【 " + Thread.currentThread().getName()
+ "】 ");
try {
full.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(new Object());
System.out.println("【 " + Thread.currentThread().getName()
+ "】 , " + list.size());
empty.signalAll();
lock.unlock();
}
public void consume()
{
//
lock.lock();
while (list.size() == 0) {
System.out.println("【 " + Thread.currentThread().getName()
+ "】 ");
try {
empty.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.remove();
System.out.println("【 " + Thread.currentThread().getName()
+ "】 , " + list.size());
full.signalAll();
lock.unlock();
}
}
자바,C\#이러한 대상 을 대상 으로 하지만 channel 언어 가 없 으 면 생산자,소비자 모델 은 적어도 하나의 lock 과 두 개의 신 호 량 을 통 해 공동으로 완성 해 야 한다.그 중에서 자물쇠 의 역할 은 같은 시간 을 확보 하 는 것 이다.창고 에 한 명의 사용자 만 데 이 터 를 수정 하고 창고 가 가득 찬 신 호 량 을 표시 해 야 한다.창고 가 가득 찬 상황 에 이 르 면 이 신 호 량 을 차단 상태 로 설정 하여 다른 생산자 가 창고 에 상품 을 운반 하 는 것 을 막 아야 한다.반면에 창고 가 비어 있 는 신 호 량 도 마찬가지다.창고 가 비어 있 으 면다른 소비자 들 이 다시 소비 하 는 것 도 막 아야 한다.Go 의 높 은 병행 실현
우 리 는 방금 Go 언어 에서 공식 적 으로 channel 을 사용 하여 협 정 간 통신 을 실현 하 는 것 을 추 천 했 기 때문에 lock 과 신 호 량 을 추가 하지 않 아 도 모델 을 실현 할 수 있 습 니 다.다음 코드 에서 우 리 는 서브 goroutine 을 통 해 생산자 의 기능 을 완 성 했 고 다른 키 goroutine 에서 소비자 의 기능 을 실 현 했 습 니 다.메 인 goroutine 을 막 아 서브 goroutine 이 실 행 될 수 있 도록 해 야 합 니 다.이로써 생산자 소비자 모델 을 쉽게 완성 했다.구체 적 인 실천 을 통 해 생산자 소비자 모델 의 실현 을 살 펴 보 자.
package main
import (
"fmt"
"time"
)
func Product(ch chan<- int) { //
for i := 0; i < 3; i++ {
fmt.Println("Product produceed", i)
ch <- i // channel goroutine , lock .
}
}
func Consumer(ch <-chan int) {
for i := 0; i < 3; i++ {
j := <-ch // channel goroutine , lock .
fmt.Println("Consmuer consumed ", j)
}
}
func main() {
ch := make(chan int)
go Product(ch)// goroutine
go Consumer(ch)// goroutine
time.Sleep(time.Second * 1)// goroutine
/* ,
Product produceed 0
Product produceed 1
Consmuer consumed 0
Consmuer consumed 1
Product produceed 2
Consmuer consumed 2
*/
}
자바 에 비해 GO 를 사용 하여 실현 하고 헤어스타일 을 하 는 생산자 소비자 모델 이 더욱 상쾌 해 지 는 것 을 볼 수 있다.Rust 의 높 은 병발 실현
어 쩔 수 없 이 Rust 의 난이도 가 너무 높 습 니 다.필자 가 예전 에 어 셈 블 리,C,자바 등 분야 의 경험 을 통 해 저 는 Go 언어 를 신속하게 파악 하 는 데 도움 을 줄 수 있 지만.하지만 방학 동안 러 스 트 를 이틀 동안 보고 작별 인 사 를 하고 싶 었 다.니 마 는 너무 권고 했다.Rust 가 공식 적 으로 제공 하 는 기능 에서 사실은 다 생산자,다 소비자 의 channel 을 포함 하지 않 습 니 다.std:sync 공간 에서 다 생산자 단일 소비자(mpsc)의 channel 만 있 습 니 다.그 사례 는 다음 과 같다.
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = mpsc::Sender::clone(&tx);
let tx2 = mpsc::Sender::clone(&tx);
thread::spawn(move || {
let vals = vec![
String::from("1"),
String::from("3"),
String::from("5"),
String::from("7"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("11"),
String::from("13"),
String::from("15"),
String::from("17"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("21"),
String::from("23"),
String::from("25"),
String::from("27"),
];
for val in vals {
tx2.send(val).unwrap();
thread::sleep(Duration::from_secs(1));
}
});
for rec in rx {
println!("Got: {}", rec);
}
}
Rust 에서 생산자 소비 자 를 실현 하 는 것 은 어렵 지 않다 는 것 을 알 수 있 지만 생산 자 는 여러 개 를 복제 할 수 있 지만 소비 자 는 한 가지 만 있 을 수 있다.그 이 유 는 Rust 에서 GC 즉 쓰레기 회수 기능 이 없 기 때문에 안전 한 Rust 를 확보 하려 면 사용권 제한 변경 에 대해 엄격 한 관 리 를 해 야 하기 때문이다.Rust 에서 move 키 워드 를 사용 하여 변 경 된 소유권 이전 을 하지만 Rust 가 생산 주 기 를 변경 하 는 관리 규정 에 따라 스 레 드 간 권한 이전 의 소유권 수신 자 는 같은 시간 에 한 가지 만 있 을 수 있 습 니 다.이것 도 Rust 정부 가 MPSC 만 제공 하 는 이유 입 니 다.
use std::thread;
fn main() {
let s = "hello";
let handle = thread::spawn(move || {
println!("{}", s);
});
handle.join().unwrap();
}
물론 Rust 의 다음 API 는 join 입 니 다.그 는 모든 하위 스 레 드 가 끝 난 후에 메 인 스 레 드 를 종료 할 수 있 습 니 다.이것 은 Go 에서 손 으로 차단 하 는 것 보다 어느 정도 향상 되 어야 합 니 다.만약 에 다 생산자,다 소비자 의 기능 을 사용 하려 면 crossbeam 모듈 을 구 해 야 한다.이 모듈 은 파악 하기 도 어렵 지 않다.총결산
위의 비 교 를 통 해 우 리 는 표 한 장 으로 몇 가지 주류 언어의 상황 대 비 를 설명 할 수 있다.
언어.
안전성
운행 속도
프로 세 스 시작 속도
학습 난이도
C
낮다
매우 빠르다
매우 빠르다
어렵다
Java
높다
여 간
여 간
여 간
Go
높다
비교적 빠르다
비교적 빠르다
여 간
Rust
높다
매우 빠르다(기본 견갑 C)
매우 빠르다(기본 견갑 C)
극히 곤란 하 다
Rust 가 높 은 안전성,기본 적 인 어깨 비교 C 의 운행 과 작 동 속 도 는 반드시 Serverless 의 시대 에 앞장 설 것 이다.Go 는 기본적으로 그 뒤 를 따 를 수 있다.C 언어 프로그램 에서 피 할 수 없 는 야생 지침,자바 가 상대 적 으로 낮은 운행 과 작 동 속 도 는 함수 식 연산 에 사용 하기에 적합 하지 않 을 것 이다.자바 는 기업 급 개발 시대 에 각종 C\#와 같은 상 대 를 물 리 쳤 다.그러나 구름 시대 에는 예전 의 통 치 력 만큼 강하 지 않 은 것 같 습 니 다.당신 을 이 긴 것 은 당신 의 상대 가 아니 라 다른 공간의 강 차원 타격 이 라 고 할 수 있 습 니 다.
이 글 의 내용 은 여기까지 입 니 다.당신 에 게 도움 을 줄 수 있 기 를 바 랍 니 다.또한 당신 이 우리 의 더 많은 내용 에 관심 을 가 져 주 셨 으 면 좋 겠 습 니 다!
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Visual Studio에서 파일 폴더 구분 (포함 경로 설정)Visual Studio에서 c, cpp, h, hpp 파일을 폴더로 나누고 싶었습니까? 어쩌면 대부분의 사람들이 있다고 생각합니다. 처음에 파일이 만들어지는 장소는 프로젝트 파일 등과 같은 장소에 있기 때문에 파일...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.