Redis 분포 식 자물쇠 의 사용 과 실현 원리 에 대한 상세 한 설명

전자상거래 에서 재 고 를 줄 이 는 장면 을 모방 하 다.
1.우선 redis 에 상품 재고 수량 을 넣는다.

2.Spring Boot 프로젝트 를 새로 만 들 고 pom 에 의존 도 를 도입 합 니 다.

 <dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
 </dependency>

 <dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-redis</artifactId>
 </dependency>
3.다음은 application.yml 에서 redis 속성 과 지정 한 포트 번 호 를 설정 합 니 다.

server:
 port: 8090

spring:
 redis:
 host: 192.168.0.60
 port: 6379
4.컨트롤 러 클래스 를 새로 만 들 고 재고 1 판 코드 를 줄 입 니 다.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.util.Objects;

@RestController
public class StockController {

 private static final Logger logger = LoggerFactory.getLogger(StockController.class);

 @Resource
 private StringRedisTemplate stringRedisTemplate;

 @RequestMapping("/reduceStock")
 public String reduceStock() {
 //  redis       
 int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
 if (stock > 0) {
  //    
  int restStock = stock - 1;
  //           redis 
  stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
  logger.info("    ,    :{}", restStock);
 } else {
  logger.info("    ,    。");
 }

 return "success";
 }
}
위의 1 판 코드 에 무슨 문제 가 있 습 니까?만약 에 여러 개의 스 레 드 가 동시에 재고 수량 을 얻 는 코드 를 호출 하면 모든 스 레 드 가 100 을 받 고 재고 가 0 보다 크다 고 판단 하면 재 고 를 줄 이 는 작업 을 수행 할 수 있다.만약 두 라인 이 모두 재 고 를 줄 이 고 캐 시 를 업데이트 한다 면 캐 시 재 고 는 99 가 되 지만 실제로는 2 개의 재 고 를 줄 여야 한다.
그러면 많은 사람들의 첫 번 째 생각 은 synchronized 동기 화 코드 블록 을 추가 하 는 것 입 니 다.수량 을 얻 고 재 고 를 줄 이 는 것 은 원자 적 인 작업 이 아니 기 때문에 여러 개의 스 레 드 가 코드 를 실행 할 때 하나의 스 레 드 만 코드 블록 에 있 는 코드 를 실행 할 수 있 습 니 다.그럼 고 친 2 판 코드 는 다음 과 같 습 니 다.

 @RequestMapping("/reduceStock")
 public String reduceStock() {
 synchronized (this) {
  //  redis       
  int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
  if (stock > 0) {
  //    
  int restStock = stock - 1;
  //           redis 
  stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
  logger.info("    ,    :{}", restStock);
  } else {
  logger.info("    ,    。");
  }
 }

 return "success";
 }
그러나 synchronize 를 사용 하 는 데 존재 하 는 문 제 는 단기 환경 이 운행 할 때 만 문제 가 없다 는 것 이다.그러나 현재 의 소프트웨어 회사 에 서 는 기본적으로 모두 집단 구조 이 고 다 중 실례 이다.앞에서 Nginx 를 사용 하여 부하 균형 을 잡 았 는데 대략 구 조 는 다음 과 같다.

Nginx 는 요청 을 다른 Tomcat 용기 에 보 내 고 synchronize 는 하나의 응용 프로그램 만 문제 가 없다 는 것 을 보증 할 수 있 습 니 다.
그러면 코드 개선 제3 판 은 바로 redis 분포 식 자 물 쇠 를 도입 하 는 것 입 니 다.구체 적 인 코드 는 다음 과 같 습 니 다.

 @RequestMapping("/reduceStock")
 public String reduceStock() {
 String lockKey = "stockKey";
 try {
  boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1");
  if (!result) {
  return "errorCode";
  }
  //  redis       
  int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
  if (stock > 0) {
  //    
  int restStock = stock - 1;
  //           redis 
  stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
  logger.info("    ,    :{}", restStock);
  } else {
  logger.info("    ,    。");
  }
 } finally {
  stringRedisTemplate.delete(lockKey)
 }
 return "success";
 }
만약 한 라인 이 자 물 쇠 를 가 져 오 면 다른 라인 은 기다 릴 것 이다.finally 에서 사용 한 자 물 쇠 를 지 워 야 한 다 는 것 을 기억 하 세 요.그렇지 않 으 면 이상 을 던 지면 한 스 레 드 만 자 물 쇠 를 계속 가지 고 있 고 다른 스 레 드 는 얻 을 기회 가 없습니다.
그러나if (stock > 0) {코드 블록 에 있 는 코드 를 실행 하면 지연 되 거나 재 부팅 이 완료 되 지 않 았 기 때문에 자 물 쇠 를 계속 가지 고 있 기 때문에 자 물 쇠 를 시간 초과 시간 을 추가 해 야 합 니 다.

 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1");
 stringRedisTemplate.expire(lockKey, 10, TimeUnit.SECONDS);
그러나 위의 두 줄 코드 가 중간 에 실행 되 는 데 문제 가 생기 면 시간 초과 코드 가 실행 되 지 않 아 자물쇠 가 풀 리 지 않 는 문제 가 발생 할 수 있 습 니 다.다행히 대응 하 는 방법 이 있 습 니 다.바로 위의 두 줄 코드 를 하나의 원자 조작 으로 설정 하 는 것 입 니 다.

 //            10 
 boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1", 10, TimeUnit.SECONDS);
여기까지 병발 량 이 많 지 않 으 면 거의 문제 가 없다.
그러나 요청 한 병발 량 이 많 으 면 새로운 문제 가 발생 할 수 있 습 니 다.비교적 특수 한 상황 이 있 습 니 다.첫 번 째 스 레 드 는 15 초 동안 실 행 했 지만 10 초 까지 실 행 했 을 때 자 물 쇠 는 효력 을 잃 고 풀 렸 습 니 다.그러면 높 은 병발 장면 에서 두 번 째 스 레 드 는 자물쇠 가 효력 을 잃 었 다 는 것 을 발견 하면 이 자 물 쇠 를 가 져 와 자 물 쇠 를 추가 할 수 있 습 니 다.
두 번 째 스 레 드 가 실행 되 는 데 8 초가 걸린다 고 가정 하면 5 초 후에 첫 번 째 스 레 드 가 실행 되 었 습 니 다.그 순간 에 key 를 삭제 하 는 작업 을 했 습 니 다.그러나 이때 의 자 물 쇠 는 두 번 째 스 레 드 에 추 가 된 것 입 니 다.그러면 첫 번 째 스 레 드 는 두 번 째 스 레 드 에 추 가 된 자 물 쇠 를 지 웠 습 니 다.
그것 은 세 번 째 스 레 드 가 자 물 쇠 를 가 져 올 수 있다 는 것 을 의미한다.세 번 째 스 레 드 는 3 초 동안 실행 되 었 다.이때 두 번 째 스 레 드 가 실행 되면 두 번 째 스 레 드 는 세 번 째 스 레 드 의 자 물 쇠 를 다시 삭제 했다.자물쇠 가 효력 을 상실 하 다.
그러면 해결 의 방향 은 바로 내 가 넣 은 자물쇠 가 다른 사람 에 게 지 워 지지 않도록 하 는 것 이다.들 어 오 는 요청 마다 유일한 id 를 만 들 수 있 습 니 다.분포 식 잠 금 값 으로 풀 때 현재 스 레 드 의 id 가 캐 시 에 있 는 id 와 같 는 지 판단 할 수 있 습 니 다.

 @RequestMapping("/reduceStock")
 public String reduceStock() {
 String lockKey = "stockKey";
 String id = UUID.randomUUID().toString();
 try {
  //            30 
  boolean result = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, id, 30, TimeUnit.SECONDS);
  if (!result) {
  return "errorCode";
  }
  //  redis       
  int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
  if (stock > 0) {
  //    
  int restStock = stock - 1;
  //           redis 
  stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
  logger.info("    ,    :{}", restStock);
  } else {
  logger.info("    ,    。");
  }
 } finally {
  if (id.contentEquals(Objects.requireNonNull(stringRedisTemplate.opsForValue().get(lockKey)))) {
  stringRedisTemplate.delete(lockKey);
  }
 }
 return "success";
 }
여기까지 비교적 완벽 한 자물쇠 가 실현 되 어 대부분의 장면 에 대처 할 수 있다.
물론 위의 코드 에 또 하나의 문제 가 있 습 니 다.바로 하나의 스 레 드 실행 시간 이 만 료 시간 을 넘 었 고 뒤의 코드 가 실행 되 지 않 았 습 니 다.자 물 쇠 는 이미 삭제 되 었 습 니 다.아니면 bug 가 존재 합 니까?해결 방법 은 자물쇠 의 생명 을 연장 하 는 작업 이다.
현재 메 인 스 레 드 에서 자 물 쇠 를 가 져 온 후에 fork 에서 스 레 드 를 내 고 Timer 타이머 작업 을 수행 할 수 있 습 니 다.만약 에 기본 시간 초과 가 30 초 라면 타이머 가 10 초 마다 이 자물쇠 가 존재 하 는 지 확인 할 수 있 습 니 다.존재 하 는 것 은 이 자물쇠 의 논리 가 아직 실행 되 지 않 았 다 는 것 을 설명 합 니 다.그러면 현재 메 인 스 레 드 의 시간 초과 시간 을 30 초 로 다시 설정 할 수 있 습 니 다.존재 하지 않 으 면 바로 끝난다.
그러나 위의 논 리 는 높 은 병발 장면 에서 비교적 완선 되 기 어렵다.다행히 지금 은 비교적 성숙 한 틀 이 있 는데 그것 이 바로 Redisson 이다.공식 주소https://redisson.org。
다음은 Redisson 으로 분포 식 잠 금 을 실현 합 니 다.
우선 의존 팩 도입:

  <dependency>
   <groupId>org.redisson</groupId>
   <artifactId>redisson</artifactId>
   <version>3.6.5</version>
  </dependency>
설정 클래스:

@Configuration
public class RedissonConfig {
 @Bean
 public Redisson redisson() {
  //     
  Config config = new Config();
  config.useSingleServer().setAddress("redis://192.168.0.60:6379").setDatabase(0);
  return (Redisson) Redisson.create(config);
 }
}
다음은 redisson 으로 위 에 있 는 재고 감축 작업 을 다시 쓰 겠 습 니 다.

 @Resource
 private Redisson redisson;
 
 @RequestMapping("/reduceStock")
 public String reduceStock() {
  String lockKey = "stockKey";
  RLock redissonLock = redisson.getLock(lockKey);
  try {
   //   ,   
   redissonLock.lock();
   //  redis       
   int stock = Integer.parseInt(Objects.requireNonNull(stringRedisTemplate.opsForValue().get("stockCount")));
   if (stock > 0) {
    //    
    int restStock = stock - 1;
    //           redis 
    stringRedisTemplate.opsForValue().set("stockCount", String.valueOf(restStock));
    logger.info("    ,    :{}", restStock);
   } else {
    logger.info("    ,    。");
   }
  } finally {
   redissonLock.unlock();
  }
  return "success";
 }
사실은 세 가지 절차 입 니 다.자 물 쇠 를 얻 고 자 물 쇠 를 넣 으 며 자 물 쇠 를 풀 어 줍 니 다.
먼저 Redisson 의 실현 원 리 를 간단하게 살 펴 보 자.

여기 서 먼저 Redis 의 많은 조작 은 Lua 스 크 립 트 를 사용 하여 원자 적 조작 을 실현 하 는데 Lua 문법 에 대해 인터넷 에서 관련 강 좌 를 찾 아 볼 수 있다.
Lua 스 크 립 트 를 사용 하 는 장점 은:
1.네트워크 비용 을 줄 이 고 여러 명령 을 한 번 에 요청 할 수 있 습 니 다.
2.원자 조작 을 실현 하면 Redis 는 Lua 스 크 립 트 를 하나의 전체 로 실행 합 니 다.
3.사 무 를 실현 하 는 데 Redis 자체 의 사무 기능 이 유한 하고 Lua 스 크 립 트 는 업무 의 일반적인 조작 을 실현 하 며 스크롤 백 도 지원 합 니 다.
그러나 루 아 는 실제로 많이 사용 하지 않 는 다.루 아 스 크 립 트 가 너무 오래 실행 되면 레 디 스 가 단일 스 레 드 이기 때문에 막 힐 수 있다.
마지막 으로 Redisson 분산 잠 금 코드 구현 에 대해 말씀 드 리 겠 습 니 다.
위의 redissonLock.lock()찾기;
lock 방법 은 RedissonLock 류 에 있 는 lock Interruptibly 방법 까지 점 을 찍 습 니 다.

 @Override
 public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
  //     id
  long threadId = Thread.currentThread().getId();
  Long ttl = tryAcquire(leaseTime, unit, threadId);
  // lock acquired
  if (ttl == null) {
   return;
  }

  RFuture<RedissonLockEntry> future = subscribe(threadId);
  commandExecutor.syncSubscription(future);

  try {
   while (true) {
    ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
     break;
    }

    // waiting for message
    if (ttl >= 0) {
     getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
    } else {
     getEntry(threadId).getLatch().acquire();
    }
   }
  } finally {
   unsubscribe(future, threadId);
  }
//  get(lockAsync(leaseTime, unit));
 }
try Acquire 방법 에 중심 을 두 고 스 레 드 id 를 매개 변수 로 전달 합 니 다.이 방법 에서 try Lock InnerAsync 방법 을 찾 아 넣 습 니 다.

 <T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
  internalLockLeaseTime = unit.toMillis(leaseTime);

  return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
     "if (redis.call('exists', KEYS[1]) == 0) then " +
      "redis.call('hset', KEYS[1], ARGV[2], 1); " +
      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
      "return nil; " +
     "end; " +
     "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
      "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
      "redis.call('pexpire', KEYS[1], ARGV[1]); " +
      "return nil; " +
     "end; " +
     "return redis.call('pttl', KEYS[1]);",
     Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
 }
여기 가 바로 Lua 스 크 립 트 입 니 다.첫 번 째 if 명령 을 보고 KEYS[1](해당 하 는 자물쇠 key 의 이름 입 니 다)를 판단 합 니 다.존재 하지 않 으 면 hashmap 에 스 레 드 id 로 속성 을 설정 하고 값 은 1 이 며 map 의 만 료 시간 을 internal LockLeaseTime 으로 설정 합 니 다.이 값 은 기본적으로 30 초 입 니 다.

위의 작업 에 대응 하 는 명령 은:

hset keyname id:thread 1
pexpire keyname 30
그리고 닐 로 돌아 갑 니 다.null 에 해당 하 는 프로그램 return 입 니 다.
또한 Redisson 은 잠 금 재 입 도 지원 합 니 다.두 번 째 if 는 잠 금 재 입 작업 을 수행 하 는 것 입 니 다.잠 금 이 존재 하 는 지 여 부 를 판단 하고 들 어 오 는 스 레 드 id 가 현재 스 레 드 의 id 인지 여 부 를 판단 합 니 다.만약 에 반복 적 으로 잠 금 을 추가 하여 자체 증가 작업 을 하 는 것 을 지원 합 니 다.
다른 스 레 드 가 lock 방법 을 호출 하면 위의 두 if 가 가지 않 을 것 이 라 고 판단 하면 잠 금 이 남 은 만 료 시간 을 되 돌려 줍 니 다.
이 어 try Acquire Async 방법 으로 돌아 가 아래 를 내 려 다 보 세 요.
실제로 모니터 를 추 가 했 습 니 다.모니터 에 중요 한 방법 이 있 습 니 다.schedule Expiration Renewal 은 이 이름 을 보면 어떤 기능 인지 대충 알 수 있 습 니 다.
안에 정시 미 션 에 대한 문의 가 있 습 니 다.

private void scheduleExpirationRenewal(final long threadId) {
  if (expirationRenewalMap.containsKey(getEntryName())) {
   return;
  }

  Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
   @Override
   public void run(Timeout timeout) throws Exception {
    //          id             id,   ,     ,  30 。
    RFuture<Boolean> future = commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
      "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
       "redis.call('pexpire', KEYS[1], ARGV[1]); " +
       "return 1; " +
      "end; " +
      "return 0;",
       Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
    
    future.addListener(new FutureListener<Boolean>() {
     @Override
     public void operationComplete(Future<Boolean> future) throws Exception {
      expirationRenewalMap.remove(getEntryName());
      if (!future.isSuccess()) {
       log.error("Can't update lock " + getName() + " expiration", future.cause());
       return;
      }
      
      if (future.getNow()) {
       // reschedule itself
       scheduleExpirationRenewal(threadId);
      }
     }
    });
   }
  }, internalLockLeaseTime / 3, TimeUnit.MILLISECONDS);

  if (expirationRenewalMap.putIfAbsent(getEntryName(), task) != null) {
   task.cancel();
  }
 }
이 어 10 초(internalLockLease Time/3)를 미 루 고 속 명 조작 논 리 를 집행 한다.
마지막 으로 lockInterruptibly 방법 으로 돌아 갑 니 다.ttl 이 null 이 라면 잠 금 추가 에 성공 했다 는 것 을 설명 하고 null 로 돌아 갑 니 다.다른 스 레 드 가 있 으 면 남 은 만 료 시간 을 되 돌려 줍 니 다.그러면 while 사 순환 에 들 어가 잠 금 을 추가 하고 try Acquire 방법 을 사용 하여 번 거 로 운 실효 후에 잠 금 을 가 져 오 려 고 합 니 다.
여기까지 분석 완료.
총결산
레 디 스 분포 식 잠 금 의 사용 과 실현 원리 에 관 한 이 글 은 여기까지 소개 되 었 습 니 다.레 디 스 분포 식 잠 금 의 사용 과 원리 에 관 한 내용 은 예전 의 글 을 검색 하거나 아래 의 관련 글 을 계속 조회 하 시기 바 랍 니 다.앞으로 많은 응원 부탁드립니다!

좋은 웹페이지 즐겨찾기