python tornado 통합 redis 메시지 구독 비동기 작업 후 tornado 메 인 프로그램 을 시작 할 수 없습니다.

3201 단어 서버MQpython
redis 메시지 구독 의 비동기 작업 을 사용 한 후,tornado 메 인 프로그램 은 CacheQueue 3 를 사용 해 야 문 제 를 해결 할 수 있 습 니 다.구체 적 인 원인 은 나중에 소스 코드 를 자세히 살 펴 보 겠 습 니 다.
CacheQueue1
import redis
import logging

class CacheQueue(object):

    def __init__(self, host, port, cache_update_path):
        self._pool_cache = redis.ConnectionPool(host=host,  port=port, db=0)
		
	@gen.coroutine
	def listen():
        r = redis.StrictRedis(connection_pool=self._pool_cache)
        p = r.pubsub()
        p.subscribe("cache_update_path")
        for item in p.listen():
			if item['type'] == 'message':
                    logging.info(item['data'])

CacheQueue2
import redis
import logging
from threading import Thread

class CacheQueue(object):
    def __init__(self, host, port, cache_update_path):
        self._pool_cache = redis.ConnectionPool(host=host,  port=port, db=0)
		

	def async_listen():
		thead_sub = Thread(target=self.listen, args=(byte,))
		thead_sub.start()


	def listen():
        r = redis.StrictRedis(connection_pool=self._pool_cache)
        p = r.pubsub()
        p.subscribe("cache_update_path")
        for item in p.listen():
			if item['type'] == 'message':
                    logging.info(item['data'])

CacheQueue3
import redis
import logging
from concurrent import futures
executor = futures.ThreadPoolExecutor(max_workers=1)
class CacheQueue(object):
    def __init__(self, host, port, cache_update_path):
        self._pool_cache = redis.ConnectionPool(host=host,  port=port, db=10)
        self.cache_update_path =cache_update_path


    def _listen(self,pool_cache):
        logging.info('async ==> redis_cache')
        r = redis.StrictRedis(connection_pool=pool_cache)
        p = r.pubsub()
        p.subscribe("cache_update_path")
        for item in p.listen():
            logging.info('get publish content')
            print item
            if item['type'] == 'message':
                logging.info(item['data'])
                self.cache_update_path.add(item['data'])


    def async_listen(self):
        executor.submit(self._listen, self._pool_cache)

호출 코드 세 션
class Application(tornado.web.Application):
	def __init__(self, settings=None, script=None):
		self.cache_queue = CacheQueue(host=self.settings.get('redis_host', 'localhost'),
                                              port=self.settings.get('redis_port', 6379),
                                                cache_update_path=self.cache_update_path)
		tornado.web.Application.__init__(self, handlers, **self.settings)
		
		#   :    redis           ,tornado        
		#self.cache_queue.listen()    #    CacheQueue 1 :   
		
		#    CacheQueue 2 :     
		#    CacheQueue 3 :     
		self.cache_queue.async_listen()

def main():
		...     
	app = Application(settings=settings, script=script)
	app.port = port
	http_server = tornado.httpserver.HTTPServer(app, xheaders=True)
    http_server.listen(options.port)
	....
	tornado.ioloop.IOLoop.instance().start()

좋은 웹페이지 즐겨찾기