【프로젝트 실전】--익명 내부 클래스로 다중 스레드 조작 실현

3948 단어 프로젝트 실전
다음 [프로젝트 실전] - 인터페이스 인터페이스는 어떤 상황에서 new에 걸릴 수 있습니까?
처음에task-역사 메시지 압축 파일 작업을 수행할 때 익명 내부 클래스를 사용하여 LoopCall 방법을 실현했습니다.여기서 processPageData 메서드에서는 히스토리 메시지를 다중 스레드로 처리합니다.
	@Override
	public void execute(TaskContext ctx) {
		logger.info("          ");
		Date minTime = TimeUtil.minusDay(new Date(), keepMaxDay);

		LoopPageExecutor.execute(new LoopCall() {

			@Override
			public List getBatch(int batchSize) {
				List batchList = historyDao.queryExistSendTimeEalier(minTime, batchSize);
				logger.info("      :" + batchList.size());
				return batchList;
			}

			@Override
			public void processPageData(List data) {
				List> calls = new LinkedList<>();
				for (UserMsgHistory his : data) {
					calls.add(new MsgArchiveCall(his, minTime));

				}
				ConcurrentExecutor.execute(calls, 30);
			}

		}, 3000);

	}
	/**
	 *    ,    。
* calls , batchSize 。 ,
* * @param calls * @param batchSize * @return */ public static List execute(Collection> calls, int batchSize) { logger.debug(" , :{}", calls.size()); List resultColl = new CopyOnWriteArrayList(); if (CollectionUtils.isEmpty(calls)) { return resultColl; } List>> deviceCalls = CollectionConventer.devide(calls, batchSize); for (List> batch : deviceCalls) { List batchResult = execute(batch); resultColl.addAll(batchResult); } return resultColl; }
	/**
	 *    ,    。        ,      
* , 。
* * @param calls */ public static List execute(Collection> calls) { logger.debug(" , :{}", calls.size()); List resultColl = new CopyOnWriteArrayList(); if (CollectionUtils.isEmpty(calls)) { return resultColl; } if (calls.size() == 1) { // , ConcurrentCall call = calls.iterator().next(); try { CountDownLatch cdl = new CountDownLatch(1); call.setCdl(cdl); V v = call.call(); resultColl.add(v); } catch (Exception e) { e.printStackTrace(); logger.error(" ,{}", e); // , list } return resultColl; } else { CountDownLatch cdl = new CountDownLatch(calls.size()); List> futures = new LinkedList<>(); for (ConcurrentCall call : calls) { call.setCdl(cdl); Future future = executor.submit(call); futures.add(future); } try { logger.debug(" "); cdl.await(); logger.debug(" , "); for (Future f : futures) { resultColl.add(f.get()); } return resultColl; } catch (InterruptedException | ExecutionException e) { logger.warn(" ,{}", e); throw new PushInnerException(e); } } }
	/**
	 *          ,       
	 */
	@Data
	public static abstract class ConcurrentCall implements Callable {
		private CountDownLatch cdl;

		@Override
		public V call() throws Exception {
			try {
				V v = this.doCall();
				return v;
			} catch (Throwable t) {
				if (t instanceof PushInnerException) {
					logger.error(t.getMessage());
				} else {
					logger.error("unknown exception", new MailAlarmAppender.MailAlarmException(t));
				}

				throw new RuntimeException(t);
			} finally {
				this.cdl.countDown();
			}

		}

		public abstract V doCall() throws Exception;

	}

보충:
CopyOnWriteArrayList의 원리와 사용 방법
Java 동시 프로그래밍:Callable, Future 및 FutureTask
 

좋은 웹페이지 즐겨찾기