TIL#24 [Nest.js] Queue System feat.@nestjs/bull

34262 단어 queueMQNestJSMQ

개요

Queue자료구조에 대해 살표보고 Queue와 Stack 자료구조의 차이 그리고 왜 Queue System을 왜 사용해야 하는지 알아보는 시간을 갖도록 하겠습니다.

Queue 자료구조란?

컴퓨터의 기본적인 데이터 구조의 하나이며, FIFO(first in first out) 형태로 예를 들면 데이터를 담는 그릇이 있다고 치면 먼저 들어온 데이터를 먼저 꺼내는 형식의 자료구조 입니다.

Queue System을 사용하는 이유

  • 비동기 : Queue에 넣기 때문에 나중에 처리할 수 있습니다.

  • 탄력성 : 일부가 실패 시 전체는 영향을 받지 않습니다.

  • 과잉 : 실패할 경우 재실행이 가능합니다.

  • 확장성 : 다수의 프로세스들이 큐에 메시지를 보낼 수 있습니다.

가볍게 실습 해보도록 하겠습니다.

$ brew install redis local에 레디스를 설치해 줍니다.

$ brew services start redis Redis 설치
$ brew services start 명령어를 통해 Redis를 실행시켜 줍니다.

$ brew services stop redis Redis 종료
$ brew services restart redis Redis 리부팅

APPSTORE에서 Red 설치 Redis용 DB툴 이라고 보면 될 것 같슴다.

$ yarn add @nestjs/bull bull bull 설치
$ yarn add @types/bull --dev

app.module에 Bullmodule 추가 합니다.

//app.module.ts

import { Module } from '@nestjs/common';
import { TypeOrmModule } from '@nestjs/typeorm';
import { UserModule } from './user/user.module';
import { BullModule } from '@nestjs/bull';

@Module({
  imports: [
    TypeOrmModule.forRoot({
      type: 'sqlite',
      database: 'db',
      entities: [__dirname + '/**/*.entity{.ts,.js}'],
      synchronize: true,
    }),
    BullModule.forRoot({    
      redis: {
        host: 'localhost',
        port: 6379,
      },
    }),
    UserModule,
  ],
  controllers: [],
  providers: [],
})
export class AppModule {}  

BullModule Options

 forRoot()메서드는 bull애플리케이션에 등록된 모든 대기열에서 사용할 패키지 구성 개체 를 등록하는 데 사용됩니다(달리 지정하지 않는 한). 구성 개체는 다음 속성으로 구성됩니다.

limiter: RateLimiter- 대기열의 작업이 처리되는 속도를 제어하는 ​​옵션입니다. 

redis: RedisOpts- Redis 연결을 구성하는 옵션입니다. prefix: string- 모든 대기열 키에 대한 접두사

defaultJobOptions: JobOpts- 새 작업에 대한 기본 설정을 제어하는 ​​옵션. 

settings: AdvancedSettings- 고급 대기열 구성 설정. 일반적으로 변경해서는 안 됩니다. 


user module에 queue작업 생성하는 모듈을 추가합니다.

//user.module.ts


import { Module } from '@nestjs/common';
import { UserController } from './user.controller';
import { UserService } from './user.service';
import { userCreator } from './implementations/user-creator';
import { TypeOrmModule } from '@nestjs/typeorm';
import { UserEntity } from './entities/user.entity';
import { BullModule } from '@nestjs/bull';

@Module({
  imports: [
    TypeOrmModule.forFeature([UserEntity]),
    BullModule.registerQueue({
      name: 'hassan',
    }),
  ],
  controllers: [UserController],
  providers: [UserService, userCreator],
})
export class UserModule {}

Queue 생성자 만들기 일해라핫산

//user.service.ts

Queue가 필요한 서비스영역에서 큐작업을 생성합니다.
Job객체에 유저를 생성하기 위한 데이터를 같이 담아서 사용할수 있습니다.

import { Injectable } from '@nestjs/common';
import { createDto } from './DTO/create-dto';
import { userCreator } from './implementations/user-creator';
import { InjectQueue } from '@nestjs/bull';
import { Queue } from 'bull';

@Injectable()
export class UserService {
  constructor(
    private userCreator: userCreator,
    @InjectQueue('hassan') private testQueue: Queue,
  ) {}

  async create(data: createDto): Promise<void> {
    const job = await this.testQueue.add('userCreate', data);
    return await this.userCreator.create(job);
  }
}

작업 옵션

작업에는 추가 옵션이 연결될 수 있습니다. 메서드 의 job인수 뒤에 옵션 개체를 전달합니다 Queue.add(). 작업 옵션 속성은 다음과 같습니다.

priority: number- 선택적 우선순위 값입니다. 범위는 1(가장 높은 우선 순위)에서 MAX_INT(가장 낮은 우선 순위)까지입니다. 우선 순위를 사용하면 성능에 약간의 영향을 미치므로 주의해서 사용하십시오.

delay: number- 이 작업을 처리할 수 있을 때까지 대기하는 시간(밀리초)입니다. 정확한 지연을 위해 서버와 클라이언트 모두 시계를 동기화해야 합니다.

attempts: number- 작업이 완료될 때까지 시도한 총 횟수입니다.

repeat: RepeatOpts- cron 사양에 따라 작업을 반복합니다. RepeatOpts를 참조하십시오 .

backoff: number | BackoffOpts- 작업이 실패할 경우 자동 재시도를 위한 백오프 설정. BackoffOpts를 참조하십시오 .

lifo: boolean- true인 경우 왼쪽 대신 대기열의 오른쪽 끝에 작업을 추가합니다(기본값은 false).

timeout: number- 작업이 시간 초과 오류와 함께 실패해야 하는 시간(밀리초)입니다.

jobId: number| string- 작업 ID 재정의 - 기본적으로 작업 ID는 고유한 정수이지만 이 설정을 사용하여 재정의할 수 있습니다. 이 옵션을 사용하는 경우 jobId가 고유한지 확인하는 것은 사용자에게 달려 있습니다. 이미 존재하는 ID로 작업을 추가하려고 하면 추가되지 않습니다.

removeOnComplete: boolean | number- true인 경우 성공적으로 완료되면 작업을 제거합니다. 숫자는 유지할 작업의 양을 지정합니다. 기본 동작은 작업을 완료된 세트로 유지하는 것입니다.

removeOnFail: boolean | number- true인 경우 모든 시도 후에 실패하면 작업을 제거합니다. 숫자는 유지할 작업의 양을 지정합니다. 기본 동작은 작업을 실패한 세트로 유지하는 것입니다.

stackTraceLimit: number- 스택 트레이스에 기록될 스택 트레이스 라인의 양을 제한합니다.


Queue 작업 목록이 잘 생깁니다~

Queue 소비자 생성

user.consumer.ts 파일을 user파일 안에 생성합니다. 'src/user/user.consumer.ts'
user.consumer.ts 파일안에 아래 내용을 작성합니다.


import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
import { userCreator } from './implementations/user-creator';

@Processor('hassan')          // 해당부분에서 'hassan' 이라는 queue name을 가진 큐를 불러옵니다.
export class UserConsumer {
  constructor(private readonly userCreator: userCreator) {}

  @Process('userCreate').    // 해당부분에서 'userCreate'라는 jobname을 가진 큐를 꺼내서 실행합니다.
  async creatUser(job: Job) {
    try {
      await this.userCreator.create(job.data) //서비스(생성자)에서 만든 큐 안에 데이터를 꺼내서 유저를 생성하도록 합니다.
      
    } catch (error) {
        console.info(error);
      }
    }
  }
 

서비스에서 받은 Job객체안의 data객체가 생성되며 data안에 객체가 담깁니다.

user.creator 작성

//user-creator.ts


import { InjectRepository } from '@nestjs/typeorm';
import { UserEntity } from '../entities/user.entity';
import { Injectable } from '@nestjs/common';
import { Repository } from 'typeorm';
import { Job } from 'bull';

@Injectable()
export class userCreator {
  constructor(
    @InjectRepository(UserEntity)
    private userEntityRepository: Repository<UserEntity>,
  ) {}

  async create(job: Job): Promise<void> {
    console.log(job);
    await this.userEntityRepository.insert(job.data);

    return;
  }
}

log로 job객체를 확인 해봅시다~

//job

Job {
  opts: {
    jobId: undefined,
    attempts: 1,
    delay: 0,
    timestamp: 1638040223695,
    backoff: undefined
  },
  name: 'userCreate',
  queue: Queue {
    name: 'hassan',
    token: '683ed31a-0339-45e5-8d9f-73aceeef1953',
    keyPrefix: 'bull',
    clients: [ [Redis] ],
    clientInitialized: true,
    _events: [Object: null prototype] {
      close: [Function],
      error: [Function (anonymous)]
    },
    _eventsCount: 2,
    _initializing: Promise { undefined },
    handlers: {},
    processing: [],
    retrieving: 0,
    drained: true,
    settings: {
      lockDuration: 30000,
      stalledInterval: 30000,
      maxStalledCount: 1,
      guardInterval: 5000,
      retryProcessDelay: 5000,
      drainDelay: 5,
      backoffStrategies: {},
      lockRenewTime: 15000
    },
    timers: TimerManager { idle: true, listeners: [], timers: {} },
    moveUnlockedJobsToWait: [Function: bound ],
    processJob: [Function: bound ],
    getJobFromId: [Function: bound ] AsyncFunction,
    keys: {
      '': 'bull:hassan:',
      active: 'bull:hassan:active',
      wait: 'bull:hassan:wait',
      waiting: 'bull:hassan:waiting',
      paused: 'bull:hassan:paused',
      resumed: 'bull:hassan:resumed',
      'meta-paused': 'bull:hassan:meta-paused',
      id: 'bull:hassan:id',
      delayed: 'bull:hassan:delayed',
      priority: 'bull:hassan:priority',
      'stalled-check': 'bull:hassan:stalled-check',
      completed: 'bull:hassan:completed',
      failed: 'bull:hassan:failed',
      stalled: 'bull:hassan:stalled',
      repeat: 'bull:hassan:repeat',
      limiter: 'bull:hassan:limiter',
      drained: 'bull:hassan:drained',
      progress: 'bull:hassan:progress'
    },
    onApplicationShutdown: [Function (anonymous)]
  },
  data: { nickname: 'que', password: 'kimbab', email: '[email protected]' },
  _progress: 0,
  delay: 0,
  timestamp: 1638040223695,
  stacktrace: [],
  returnvalue: null,
  attemptsMade: 0,
  toKey: [Function: wrapper],
  id: '3'
}

Reference:
https://docs.nestjs.com/techniques/queues#queue-management
https://velog.io/@peter0618/Nestjs-Queues

좋은 웹페이지 즐겨찾기