간단한 SEDA 프레임워크 구현
10797 단어 code
/**
*
*
* @author wuyuhou
*
*/
public interface IStagedContainer {
//
String getId();
//
void sendEvent(IEvent e);
//
void start();
//
void stop();
}
/**
*
*
* @author yourname (mailto:[email protected])
*/
public interface IEvent {
String getId();
<T> T getData();
void setData(Object data);
Throwable getException();
void setException(Throwable exception);
}
/**
*
*
* @author yourname (mailto:[email protected])
*/
public interface IEventHandler {
/**
* ,
*
* @param event
*/
void handleEvent(IEvent event);
}
/**
*
*
* @author wuyuhou
*
*/
public interface IEventCallback {
/**
*
*
* @param event
*/
void callback(IEvent event);
}
/**
*
*
* @author wuyuhou
*
*/
public interface IEventRouter {
/**
*
*
* @param event
*/
void route(IEvent event);
}
/**
*
*
* @author yourname (mailto:[email protected])
*/
public class StagedContainer implements IStagedContainer {
private static final ILogger log = DebugLoggerFactory.getLogger(StagedContainer.class);
//
private String id = null;
// ( )
private Executor executor = null;
// ( )
private IQueue<IEvent> queue = null;
//
private IEventHandler eventHandler = null;
//
private IEventRouter eventRouter = null;
//
private EventHandelMainThread eventHandelMainThread = null;
// ,
private int idleTime = 1000;
private boolean isStarted = false;
/**
*
*
*
*/
public StagedContainer(String id) {
if (id == null || id.trim().length() == 0) {
throw new IllegalArgumentException("StagedContainerId is null!");
}
this.id = id;
}
public String getId() {
return id;
}
protected Executor getExecutor() {
return executor;
}
protected IQueue<IEvent> getQueue() {
return queue;
}
protected void setExecutor(Executor executor) {
if (executor == null) {
throw new IllegalArgumentException("executor is null!");
}
this.executor = executor;
}
protected void setQueue(IQueue<IEvent> queue) {
if (queue == null) {
throw new IllegalArgumentException("queue is null!");
}
this.queue = queue;
}
public IEventHandler getEventHandler() {
return eventHandler;
}
public void setEventHandler(IEventHandler eventHandler) {
if (eventHandler == null) {
throw new IllegalArgumentException("eventHandler is null!");
}
this.eventHandler = eventHandler;
}
public IEventRouter getEventRouter() {
return eventRouter;
}
public void setEventRouter(IEventRouter eventRouter) {
if (eventRouter == null) {
throw new IllegalArgumentException("eventRouter is null!");
}
this.eventRouter = eventRouter;
}
public int getIdleTime() {
return idleTime;
}
public void setIdleTime(int idleTime) {
if (idleTime <= 0) {
throw new IllegalArgumentException("IdleTime is not less than zero!");
}
this.idleTime = idleTime;
}
//
public void sendEvent(IEvent e) {
if (!isStarted) {
throw new IllegalStateException("StagedContianer has not yet started!");
}
if (e == null) {
throw new IllegalArgumentException("event is null!");
}
getQueue().offer(e);
}
public void start() {
if (queue == null) {
//
queue = new PersistenceQueue<IEvent>(3000, new DirPersistence<IEvent>("d:/test/queue"));
}
queue.start();
if (executor == null) {
// 10
executor = Executors.newFixedThreadPool(10, new ThreadFactoryWithName("StagedContainer:" + getId()));
}
eventHandelMainThread = new EventHandelMainThread(getId(), getQueue(), getExecutor(),
new IEventHandler() {
public void handleEvent(IEvent event) {
String eventId = event.getId();
//
if (CancelEventCache.containCancelEvent(eventId)) {
CancelEventCache.removeCancelEvent(eventId);
log.warn("Event[{0}] is cancel!", new Object[]{eventId});
return;
}
try {
//
IEventHandler eventHandler = getEventHandler();
if (eventHandler != null) {
eventHandler.handleEvent(event);
}
} finally {
//
IEventRouter eventRouter = getEventRouter();
if (eventRouter != null) {
eventRouter.route(event);
}
}
}
}, idleTime);
eventHandelMainThread.start();
isStarted = true;
}
public void stop() {
if (executor != null) {
//
if (executor instanceof ExecutorService) {
ExecutorService es = (ExecutorService) executor;
try {
es.shutdownNow();
} catch (Exception e) {
try {
es.shutdown();
}catch (Exception ignore) {
}
}
}
}
eventHandelMainThread.shutdownThread();
if (queue != null) {
queue.stop();
}
executor = null;
eventHandelMainThread = null;
queue = null;
isStarted = false;
}
//
static class EventHandelMainThread extends Thread {
private boolean isShutdown = false;
private IQueue<IEvent> eventQueue = null;
private Executor executor = null;
private IEventHandler eventHandler = null;
private int idleTime;
public EventHandelMainThread(String name, IQueue<IEvent> eventQueue, Executor executor, IEventHandler eventHandler, int idleTime) {
super(name);
this.eventQueue = eventQueue;
this.executor = executor;
this.eventHandler = eventHandler;
this.idleTime = idleTime;
}
@Override
public void run() {
while (!isShutdown) {
final IEvent event = eventQueue.poll();
//
if (event == null) {
try {
Thread.sleep(idleTime);
} catch (InterruptedException e) {
}
continue;
}
executor.execute(new Runnable(){
public void run() {
eventHandler.handleEvent(event);
}
});
}
}
//
public void shutdownThread() {
isShutdown = true;
this.interrupt();
}
}
}
/**
*
*
* @author yourname (mailto:[email protected])
*/
public class CancelEventCache {
private static Object OBJECT = new Object();
private static ConcurrentHashMap<String, Object> eventMap = new ConcurrentHashMap<String, Object>();
public static void addCancelEvent(String eventId) {
if (eventId == null || eventId.trim().length() == 0) {
return;
}
eventMap.put(eventId, OBJECT);
}
public static void removeCancelEvent(String eventId) {
if (eventId == null || eventId.trim().length() == 0) {
return;
}
eventMap.remove(eventId);
}
public static boolean containCancelEvent(String eventId) {
if (eventId == null || eventId.trim().length() == 0) {
return false;
}
return eventMap.containsKey(eventId);
}
}
/**
* Callback
*
* @author yourname (mailto:[email protected])
*/
public class EventCallbackManager {
private static ConcurrentHashMap<String, IEventCallback> conMap = new ConcurrentHashMap<String, IEventCallback>();
public static IEventCallback getEventCallback(String eventId) {
if (eventId == null || eventId.trim().length() == 0) {
return null;
}
IEventCallback callback = conMap.get(eventId);
conMap.remove(eventId);
return callback;
}
public static void register(String eventId, IEventCallback callback) {
if (eventId == null || eventId.trim().length() == 0 || callback == null) {
return;
}
conMap.put(eventId, callback);
}
public static void clear() {
conMap.clear();
}
}
/**
*
*
* @author yourname (mailto:[email protected])
*/
public class ThreadFactoryWithName implements ThreadFactory {
static final AtomicInteger poolNumber = new AtomicInteger(1);
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
final boolean isDaemon;
public ThreadFactoryWithName(String name) {
this(name, false);
}
public ThreadFactoryWithName(String name, boolean isDaemon) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
namePrefix = name == null ? "Seda-Default" : name + "-pool-" + poolNumber.getAndIncrement() + "-thread-";
this.isDaemon = isDaemon;
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement());
t.setDaemon(isDaemon);
if (t.getPriority() != Thread.NORM_PRIORITY) {
t.setPriority(Thread.NORM_PRIORITY);
}
return t;
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
소스 코드가 포함된 Python 프로젝트텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.