간단한 SEDA 프레임워크 구현

10797 단어 code

/**
 *        
 *
 * @author wuyuhou
 *
 */
public interface IStagedContainer {
		
	//    
	String getId();
	
	//    
	void sendEvent(IEvent e);

	//   
	void start();
	
	//   
	void stop();
}

/**
 *     
 *
 * @author yourname (mailto:[email protected])
 */
public interface IEvent {
	
	String getId();
	
	<T> T getData();
	
	void setData(Object data);
	
	Throwable getException();
	
	 void setException(Throwable exception);
}

/**
 *       
 *
 * @author yourname (mailto:[email protected])
 */
public interface IEventHandler {
	
	/**
	 *     ,          
	 * 
	 * @param event
	 */
	void handleEvent(IEvent event);
}

/**
 *         
 *
 * @author wuyuhou
 *
 */
public interface IEventCallback {
	/**
	 *     
	 * 
	 * @param event
	 */
	void callback(IEvent event);
}

/**
 *      
 *
 * @author wuyuhou
 *
 */
public interface IEventRouter {
	
	/**
	 *     
	 * 
	 * @param event
	 */
	void route(IEvent event);
}

/**
 *      
 *
 * @author yourname (mailto:[email protected])
 */
public class StagedContainer implements IStagedContainer {
	
	private static final ILogger log = DebugLoggerFactory.getLogger(StagedContainer.class);
	
	//    
	private String id = null;
	
	//   (     )
	private Executor executor = null;
	
	//    (       )
	private IQueue<IEvent> queue = null;
	
	//     
	private IEventHandler eventHandler = null;
	
	//      
	private IEventRouter eventRouter = null;
	
	//       
	private EventHandelMainThread eventHandelMainThread = null;	
	
	//       ,    
	private int idleTime = 1000;

	private boolean isStarted = false;
	
	/**
	 * 
	 *     
	 *
	 */
	public StagedContainer(String id) {
		if (id == null || id.trim().length() == 0) {
			throw new IllegalArgumentException("StagedContainerId is null!");
		}
		this.id = id;
	}
	
	public String getId() {
		return id;
	} 

	protected Executor getExecutor() {		
		return executor;
	}

	protected IQueue<IEvent> getQueue() {
		return queue;
	}
	
	protected void setExecutor(Executor executor) {
		if (executor == null) {
			throw new IllegalArgumentException("executor is null!");
		}
		this.executor = executor;
	}

	protected void setQueue(IQueue<IEvent> queue) {
		if (queue == null) {
			throw new IllegalArgumentException("queue is null!");
		}
		this.queue = queue;
	}

	public IEventHandler getEventHandler() {
		return eventHandler;
	}

	public void setEventHandler(IEventHandler eventHandler) {
		if (eventHandler == null) {
			throw new IllegalArgumentException("eventHandler is null!");
		}
		this.eventHandler = eventHandler;
	}
	
	public IEventRouter getEventRouter() {
		return eventRouter;
	}

	public void setEventRouter(IEventRouter eventRouter) {
		if (eventRouter == null) {
			throw new IllegalArgumentException("eventRouter is null!");
		}
		this.eventRouter = eventRouter;
	}

	public int getIdleTime() {
		return idleTime;
	}

	public void setIdleTime(int idleTime) {
		if (idleTime <= 0) {
			throw new IllegalArgumentException("IdleTime is not less than zero!");
		}
		this.idleTime = idleTime;
	}

	//     
	public void sendEvent(IEvent e) {
		if (!isStarted) {
			throw new IllegalStateException("StagedContianer has not yet started!");
		}
		if (e == null) {
			throw new IllegalArgumentException("event is null!");
		}
		getQueue().offer(e);
	}
	
	public void start() {
		if (queue == null) {
			//       
			queue = new PersistenceQueue<IEvent>(3000, new DirPersistence<IEvent>("d:/test/queue"));
		}
		
		queue.start();
		
		if (executor == null) {
			//   10         
			executor = Executors.newFixedThreadPool(10, new ThreadFactoryWithName("StagedContainer:" + getId()));
		}
		
		eventHandelMainThread = new EventHandelMainThread(getId(), getQueue(), getExecutor(), 
				new IEventHandler() {
					public void handleEvent(IEvent event) {
						String eventId = event.getId();
						//      
						if (CancelEventCache.containCancelEvent(eventId)) {
							CancelEventCache.removeCancelEvent(eventId);
							log.warn("Event[{0}] is cancel!", new Object[]{eventId});
							return;
						}
						
						try {
							//    
							IEventHandler eventHandler = getEventHandler();
							if (eventHandler != null) {
								eventHandler.handleEvent(event);
							}
						} finally {
							//    
							IEventRouter eventRouter = getEventRouter();
							if (eventRouter != null) {
								eventRouter.route(event);
							}
						}						
					}
			
		}, idleTime);
		eventHandelMainThread.start();
		isStarted = true;     
	}
	
	public void stop() {
		if (executor != null) {
			//      
			if (executor instanceof ExecutorService) {
				ExecutorService es = (ExecutorService) executor;
				try {
					es.shutdownNow();
				} catch (Exception e) {
					try {
						es.shutdown();
					}catch (Exception ignore) {
						
					}
				}
			}
		}
		eventHandelMainThread.shutdownThread();
		
		if (queue != null) {
			queue.stop();
		}
		
		executor = null;	
		eventHandelMainThread = null;
		queue = null;
		isStarted = false; 
	}
	
	//        
	static class EventHandelMainThread extends Thread {
		
		private boolean isShutdown = false;
		
		private IQueue<IEvent> eventQueue = null;
		private Executor executor = null;
		private IEventHandler eventHandler = null;
		private int idleTime;
		
		public EventHandelMainThread(String name, IQueue<IEvent> eventQueue, Executor executor, IEventHandler eventHandler, int idleTime) {
			super(name);
			this.eventQueue = eventQueue;
			this.executor = executor;
			this.eventHandler = eventHandler;
			this.idleTime = idleTime;
		}
		
		@Override
		public void run() {
			while (!isShutdown) {				
				final IEvent event = eventQueue.poll();
				//         
				if (event == null) {
					try {
						Thread.sleep(idleTime);
					} catch (InterruptedException e) {
					}
					continue;
				}
				executor.execute(new Runnable(){
					public void run() {
						eventHandler.handleEvent(event);
					}						
				});
			}
		}
		
		//     
		public void shutdownThread() {
			isShutdown = true;
			this.interrupt();
		}		
	}
}

/**
 *        
 *
 * @author yourname (mailto:[email protected])
 */
public class CancelEventCache {
	private static Object OBJECT = new Object();
	private static ConcurrentHashMap<String, Object> eventMap = new ConcurrentHashMap<String, Object>();	
	
	public static void addCancelEvent(String eventId) {
		if (eventId == null || eventId.trim().length() == 0) {
			return;
		}
		eventMap.put(eventId, OBJECT);
	}
	
	public static void removeCancelEvent(String eventId) {
		if (eventId == null || eventId.trim().length() == 0) {
			return;
		}
		eventMap.remove(eventId);
	}
	
	public static boolean containCancelEvent(String eventId) {
		if (eventId == null || eventId.trim().length() == 0) {
			return false;
		}
		return eventMap.containsKey(eventId);
	}
}

/**
 * Callback  
 *
 * @author yourname (mailto:[email protected])
 */
public class EventCallbackManager {
	private static ConcurrentHashMap<String, IEventCallback> conMap = new ConcurrentHashMap<String, IEventCallback>();	
	
	public static IEventCallback getEventCallback(String eventId) {
		if (eventId == null || eventId.trim().length() == 0) {
			return null;
		}
		IEventCallback callback = conMap.get(eventId);
		conMap.remove(eventId);
		return callback;
	}
	
	public static void register(String eventId, IEventCallback callback) {
		if (eventId == null || eventId.trim().length() == 0 || callback == null) {
			return;
		}
		conMap.put(eventId, callback);
	}
	
	public static void clear() {
		conMap.clear();
	}
}

/**
 *            
 *
 * @author yourname (mailto:[email protected])
 */
public class ThreadFactoryWithName implements ThreadFactory {
	
	static final AtomicInteger poolNumber = new AtomicInteger(1);

	final ThreadGroup group;

	final AtomicInteger threadNumber = new AtomicInteger(1);

	final String namePrefix;
	
	final boolean isDaemon;

	public ThreadFactoryWithName(String name) {
		this(name, false);
	}
	
	public ThreadFactoryWithName(String name, boolean isDaemon) {
		SecurityManager s = System.getSecurityManager();
		group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
		namePrefix = name == null ? "Seda-Default" : name + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
		this.isDaemon = isDaemon;
	}

	public Thread newThread(Runnable r) {
		Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement());		
		t.setDaemon(isDaemon);

		if (t.getPriority() != Thread.NORM_PRIORITY) {
			t.setPriority(Thread.NORM_PRIORITY);
		}
		return t;
	}
}

좋은 웹페이지 즐겨찾기