OpenStack-RPC-server 구축 (4)

이 장에서 우리는 oslo 를 통해 분석한다메시지 층에서 아래로 consumer 코드 프로세스를 만듭니다. consumer를 만들기 전에 윗글에서 말한 connection을 매개 변수로 AMQPlistener 대상을 만들고, 이 대상을 consumer를 만드는 콜백 매개 변수에 부여합니다. 따라서 메시지가 도착할 때 AMQPlistener 클래스의 를 호출합니다.call__방법여기서 주의해야 할 것은conn=self.get_connection(rpc amqp.PURPOSE LISTEN), 이 대상은 ConnectionContext 대상이며, ConnectionContext 클래스는 사실 모든 oslo 에 해당합니다.메시지 층의connection 의뢰 에이전트 클래스입니다.
# usr/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.py
def listen(self, target):
        conn = self._get_connection(rpc_amqp.PURPOSE_LISTEN)

        listener = AMQPListener(self, conn)

        conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
                                    topic=target.topic,
                                    callback=listener)
        conn.declare_topic_consumer(exchange_name=self._get_exchange(target),
                                    topic='%s.%s' % (target.topic,
                                                     target.server),
                                    callback=listener)
        conn.declare_fanout_consumer(target.topic, listener)

    return listener
class AMQPListener(base.Listener):

    def __init__(self, driver, conn):
        super(AMQPListener, self).__init__(driver)
        self.conn = conn
        self.msg_id_cache = rpc_amqp._MsgIdCache()
        self.incoming = []
        self._stopped = threading.Event()
    #usr/lib/python2.7/site-packages/oslo_messaging/_drivers/amqp.py:ConnectionContext
    def __getattr__(self, key):
        """Proxy all other calls to the Connection instance."""
        if self.connection:
            return getattr(self.connection, key)
        else:
            raise rpc_common.InvalidRPCConnectionReuse()

상술한 설명에 의하면, 우리는 ConnectionContext 클래스가 사실 모든 oslo 에 해당한다는 것을 안다메시지 층의connection 의뢰 에이전트 클래스입니다.ConnectionContext 객체를 사용하여 존재하지 않는 속성에 접근할 때(예를 들어 conn.declare topic consumer(...) ConnectionContext 클래스의 를 호출합니다getattr__메서드,getattr__방법은 실제적으로 스크롤 (fallback) 방법으로 어떤 속성을 찾지 못했을 때만 호출됩니다.
그래서 우리가 conn.declare를 실행할 때topic_consumer (...) 방법은 ConnectionContext 클래스에 declare 가 없기 때문에topic_consumer 메서드, 따라서 실행getattr__방법우리 앞의 분석에 의하면self.connection은/usr/lib/python2입니다.7/site-packages/oslo_messaging/_drivers/impl_rabbit.py의 Connection 객체는 Connection 객체의 declare 로 반환됩니다.topic_consumer 주소 (getattr (self.connection, 키) 는 여기의 예를 들어 키가 여기에declaretopic_consumer), 그리고 conn.declaretopic_consumer (...) 시 해당하는 매개 변수를 전달하여 해당하는 작업을 수행합니다.
    #/usr/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.py
    def declare_topic_consumer(self, exchange_name, topic, callback=None,
                               queue_name=None):
        """Create a 'topic' consumer."""
        self.declare_consumer(functools.partial(TopicConsumer,
                                                name=queue_name,
                                                exchange_name=exchange_name,
                                                ),
                              topic, callback)

    def declare_fanout_consumer(self, topic, callback):
        """Create a 'fanout' consumer."""
        self.declare_consumer(FanoutConsumer, topic, callback)

    def declare_consumer(self, consumer_cls, topic, callback):
        """Create a Consumer using the class that was passed in and
        add it to our list of consumers
        """

        def _connect_error(exc):
            log_info = {'topic': topic, 'err_str': exc}
            LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
                      "%(err_str)s"), log_info)

        def _declare_consumer():
            # self.consumer_num=count(1), , next ,
            # 1, next , 。 channel 
            # consumer tag。 Nova-scheduler , channel 
            # consumer, tag 1,2 3。
            consumer = consumer_cls(self.driver_conf, self.channel, topic,
                                    callback, six.next(self.consumer_num))
            self.consumers.append(consumer)
            return consumer

        with self._connection_lock:
            return self.ensure(_declare_consumer,
                               error_callback=_connect_error)

먼저 functools를 설명합니다.partial ()의 사용법, 함수 partial () 은 한 개 이상의 매개 변수에 고정된 값을 지정하여 나중에 호출할 매개 변수의 수를 줄이고 새로운 호출 대상을 되돌려줍니다.그래서 declaretopic_consumer 방법이 declare 에 전달됨consumer 방법의 consumercls는 TopicConsumer 메서드 이름이며 메서드의 name 및 exchangename 매개 변수는partial () 함수를 이용하여 미리 설정되었습니다.그래서declare_consumer () 방법에서 TopicConsumer 대상을 구성할 때,name와 exchange 를 지정할 필요가 없습니다.name 매개 변수.겸사겸사 한마디 하자면, declaretopic_consumer 방법의 exchangename=self._get_exchange(target)
#/usr/lib/python2.7/site-packages/oslo_messaging/_drivers/amqpdriver.py
def _get_exchange(self, target):
        return target.exchange or self._default_exchange

저희가 oslo에서...메시지 층에서 RPC-server의Target를 만들 때 topic와 서버 파라미터만 지정하고 exchange 파라미터는 지정하지 않았습니다 (exchange 파라미터는 Target를 구성할 때 선택할 수 있습니다). 따라서self.default_exchange의 값입니다. 이 값은 RabbitDriver 대상을 구성하여 전달된 것입니다. 이 exchange의 값은 'nova' 입니다.
저희는 계속해서 declare 로 돌아가겠습니다.topic_consumer 및 declarefanout_consumer 방법, 여기listen 방법에서declare 를 호출합니다topic_두 번, 콜백 파라미터는 AMQPlistener 대상이고Nova-scheduler 구성 요소를 보면 두 개의 콜백 파라미터는 각각 scheduler와 scheduler이다.jun(target.topic=scheduler,target.server=jun).그리고 declaretopic_consumer 및 declarefanout_consumer 방법은 모두 declare 를 호출합니다.consumer 메서드, declareconsumer 메서드는 oslo 를 호출합니다.메시지 층의 ensure 방법, ensure 방법의 코드 프로세스에 대해 한 문장을 보십시오. 우리의connection과 채널이 이미 만들어졌기 때문에 다시 만들지 않습니다. 갈고리 함수executemethod(oslo messaging 레이어) 리콜백 실행declare_consumer 방법,declare_consumer 방법은 상응하는 consumer 대상을 구성합니다.topic와fanout의consumer 대상을 구성하는 코드 프로세스가 비슷하기 때문에 우리는 주로 topic 방식의 코드 프로세스를 분석한다.
#/usr/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.py
class TopicConsumer(ConsumerBase):
    """Consumer class for 'topic'."""

    def __init__(self, conf, channel, topic, callback, tag, exchange_name,
                 name=None, **kwargs):
        """Init a 'topic' queue.

        :param channel: the amqp channel to use
        :param topic: the topic to listen on
        :paramtype topic: str
        :param callback: the callback to call when messages are received
        :param tag: a unique ID for the consumer on the channel
        :param exchange_name: the exchange name to use
        :param name: optional queue name, defaults to topic
        :paramtype name: str

        Other kombu options may be passed as keyword arguments
        """
        # Default options
        options = {'durable': conf.amqp_durable_queues,
                   'queue_arguments': _get_queue_arguments(conf),
                   'auto_delete': conf.amqp_auto_delete,
                   'exclusive': False}
        options.update(kwargs)
        exchange = kombu.entity.Exchange(name=exchange_name,
                                         type='topic',
                                         durable=options['durable'],
                                         auto_delete=options['auto_delete'])
        super(TopicConsumer, self).__init__(channel,
                                            callback,
                                            tag,
                                            name=name or topic,
                                            exchange=exchange,
                                            routing_key=topic,
                                            **options)

Topic 방식의 consumer에서 작성한 코드 프로세스는 크게 세 부분으로 나뉘어 있습니다.
a. kombu층의 exchange 대상 생성
b. kombu층의queue 대상의 생성.
c. kombu층의 exchange와queue 대상에 대한 정보를 통해amqp층의 exchange와queue 대상을 만들고amqp층의 exchange와queue 대상을 연결합니다.
#/usr/lib/python2.7/site-packages/kombu/entity.py
class Exchange(MaybeChannelBound):
    """An Exchange declaration.

    :keyword name: See :attr:`name`.
    :keyword type: See :attr:`type`.
    :keyword channel: See :attr:`channel`.
    :keyword durable: See :attr:`durable`.
    :keyword auto_delete: See :attr:`auto_delete`.
    :keyword delivery_mode: See :attr:`delivery_mode`.
    :keyword arguments: See :attr:`arguments`.

    .. attribute:: name

        Name of the exchange. Default is no name (the default exchange).

    .. attribute:: type

        AMQP defines four default exchange types (routing algorithms) that
        covers most of the common messaging use cases. An AMQP broker can
        also define additional exchange types, so see your broker
        manual for more information about available exchange types.

            * `direct` (*default*)

                Direct match between the routing key in the message, and the
                routing criteria used when a queue is bound to this exchange.

            * `topic`

                Wildcard match between the routing key and the routing pattern
                specified in the exchange/queue binding. The routing key is
                treated as zero or more words delimited by `"."` and
                supports special wildcard characters. `"*"` matches a
                single word and `"#"` matches zero or more words.

            * `fanout`

                Queues are bound to this exchange with no arguments. Hence any
                message sent to this exchange will be forwarded to all queues
                bound to this exchange.

            * `headers`

                Queues are bound to this exchange with a table of arguments
                containing headers and values (optional). A special argument
                named "x-match" determines the matching algorithm, where
                `"all"` implies an `AND` (all pairs must match) and
                `"any"` implies `OR` (at least one pair must match).

                :attr:`arguments` is used to specify the arguments.

            This description of AMQP exchange types was shamelessly stolen
            from the blog post `AMQP in 10 minutes: Part 4`_ by
            Rajith Attapattu. This article is recommended reading.

            .. _`AMQP in 10 minutes: Part 4`:
                http://bit.ly/amqp-exchange-types

    .. attribute:: channel

        The channel the exchange is bound to (if bound).

    .. attribute:: durable

        Durable exchanges remain active when a server restarts. Non-durable
        exchanges (transient exchanges) are purged when a server restarts.
        Default is :const:`True`.

    .. attribute:: auto_delete

        If set, the exchange is deleted when all queues have finished
        using it. Default is :const:`False`.

    .. attribute:: delivery_mode

        The default delivery mode used for messages. The value is an integer,
        or alias string.

            * 1 or `"transient"`

                The message is transient. Which means it is stored in
                memory only, and is lost if the server dies or restarts.

            * 2 or "persistent" (*default*)
                The message is persistent. Which means the message is
                stored both in-memory, and on disk, and therefore
                preserved if the server dies or restarts.

        The default value is 2 (persistent).

    .. attribute:: arguments

        Additional arguments to specify when the exchange is declared.

    """
    TRANSIENT_DELIVERY_MODE = TRANSIENT_DELIVERY_MODE
    PERSISTENT_DELIVERY_MODE = PERSISTENT_DELIVERY_MODE

    name = ''
    type = 'direct'
    durable = True
    auto_delete = False
    passive = False
    delivery_mode = PERSISTENT_DELIVERY_MODE

    attrs = (
        ('name', None),
        ('type', None),
        ('arguments', None),
        ('durable', bool),
        ('passive', bool),
        ('auto_delete', bool),
        ('delivery_mode', lambda m: DELIVERY_MODES.get(m) or m),
    )

    def __init__(self, name='', type='', channel=None, **kwargs):
        super(Exchange, self).__init__(**kwargs)
        self.name = name or self.name
        self.type = type or self.type
        self.maybe_bind(channel)
#/usr/lib/python2.7/site-packages/kombu/abstract.py
def maybe_bind(self, channel):
        """Bind instance to channel if not already bound."""
        if not self.is_bound and channel:
            self._channel = maybe_channel(channel)
            self.when_bound()
            self._is_bound = True
        return self

상기 코드는kombu층을 만드는 exchange 대상의 코드입니다. 그 중에서Nova-scheduler 구성 요소의 경우name는 각각scheduler와scheduler입니다.jun (두 개의 topic 방식의consumer가 있기 때문에) 여기에서 채널 매개 변수는 전달되지 않기 때문에 여기서 채널 매개 변수는 기본값인 None을 사용하기 때문에self로 직접 되돌아옵니다.그러나kombu층을 만든queue는amqp층의channel로 되돌아갈 것이며 다음에 분석할 것입니다.
kombu층의 exchange를 만들면 TopicConsumer 클래스는 부모 클래스의 초기화 코드 ( init) 를 호출합니다.이 코드에는 kombu층을 만드는queue 대상,amqp층의 exchange와queue 대상의 창설과 귀속이 포함되어 있습니다.
#/usr/lib/python2.7/site-packages/oslo_messaging/_drivers/impl_rabbit.py
class ConsumerBase(object):
    """Consumer base class."""

    def __init__(self, channel, callback, tag, **kwargs):
        """Declare a queue on an amqp channel.

        'channel' is the amqp channel to use
        'callback' is the callback to call when messages are received
        'tag' is a unique ID for the consumer on the channel

        queue name, exchange name, and other kombu options are
        passed in here as a dictionary.
        """
        self.callback = callback
        self.tag = six.text_type(tag)
        self.kwargs = kwargs
        self.queue = None
        self.reconnect(channel)

    def reconnect(self, channel):
        """Re-declare the queue after a rabbit reconnect."""
        self.channel = channel
        self.kwargs['channel'] = channel
        self.queue = kombu.entity.Queue(**self.kwargs)
        try:
            self.queue.declare()
        except Exception as e:
            # NOTE: This exception may be triggered by a race condition.
            # Simply retrying will solve the error most of the time and
            # should work well enough as a workaround until the race condition
            # itself can be fixed.
            # TODO(jrosenboom): In order to be able to match the Exception
            # more specifically, we have to refactor ConsumerBase to use
            # 'channel_errors' of the kombu connection object that
            # has created the channel.
            # See https://bugs.launchpad.net/neutron/+bug/1318721 for details.
            LOG.error(_("Declaring queue failed with (%s), retrying"), e)
            self.queue.declare()

먼저 kombu층의queue 대상을 만듭니다.코드 프로세스는 다음과 같습니다.
#/usr/lib/python2.7/site-packages/kombu/entity.py
class Queue(MaybeChannelBound):
    """A Queue declaration.

    :keyword name: See :attr:`name`.
    :keyword exchange: See :attr:`exchange`.
    :keyword routing_key: See :attr:`routing_key`.
    :keyword channel: See :attr:`channel`.
    :keyword durable: See :attr:`durable`.
    :keyword exclusive: See :attr:`exclusive`.
    :keyword auto_delete: See :attr:`auto_delete`.
    :keyword queue_arguments: See :attr:`queue_arguments`.
    :keyword binding_arguments: See :attr:`binding_arguments`.
    :keyword on_declared: See :attr:`on_declared`

    .. attribute:: name

        Name of the queue. Default is no name (default queue destination).

    .. attribute:: exchange

        The :class:`Exchange` the queue binds to.

    .. attribute:: routing_key

        The routing key (if any), also called *binding key*.

        The interpretation of the routing key depends on
        the :attr:`Exchange.type`.

            * direct exchange

                Matches if the routing key property of the message and
                the :attr:`routing_key` attribute are identical.

            * fanout exchange

                Always matches, even if the binding does not have a key.

            * topic exchange

                Matches the routing key property of the message by a primitive
                pattern matching scheme. The message routing key then consists
                of words separated by dots (`"."`, like domain names), and
                two special characters are available; star (`"*"`) and hash
                (`"#"`). The star matches any word, and the hash matches
                zero or more words. For example `"*.stock.#"` matches the
                routing keys `"usd.stock"` and `"eur.stock.db"` but not
                `"stock.nasdaq"`.

    .. attribute:: channel

        The channel the Queue is bound to (if bound).

    .. attribute:: durable

        Durable queues remain active when a server restarts.
        Non-durable queues (transient queues) are purged if/when
        a server restarts.
        Note that durable queues do not necessarily hold persistent
        messages, although it does not make sense to send
        persistent messages to a transient queue.

        Default is :const:`True`.

    .. attribute:: exclusive

        Exclusive queues may only be consumed from by the
        current connection. Setting the 'exclusive' flag
        always implies 'auto-delete'.

        Default is :const:`False`.

    .. attribute:: auto_delete

        If set, the queue is deleted when all consumers have
        finished using it. Last consumer can be cancelled
        either explicitly or because its channel is closed. If
        there was no consumer ever on the queue, it won't be
        deleted.

    .. attribute:: queue_arguments

        Additional arguments used when declaring the queue.

    .. attribute:: binding_arguments

        Additional arguments used when binding the queue.

    .. attribute:: alias

        Unused in Kombu, but applications can take advantage of this.
        For example to give alternate names to queues with automatically
        generated queue names.

    .. attribute:: on_declared

        Optional callback to be applied when the queue has been
        declared (the ``queue_declare`` method returns).
        This must be function with a signature that accepts at least 3
        positional arguments: ``(name, messages, consumers)``.

    """
    name = ''
    exchange = Exchange('')
    routing_key = ''

    durable = True
    exclusive = False
    auto_delete = False
    no_ack = False

    attrs = (
        ('name', None),
        ('exchange', None),
        ('routing_key', None),
        ('queue_arguments', None),
        ('binding_arguments', None),
        ('durable', bool),
        ('exclusive', bool),
        ('auto_delete', bool),
        ('no_ack', None),
        ('alias', None),
        ('bindings', list),
    )

    def __init__(self, name='', exchange=None, routing_key='',
                 channel=None, bindings=None, on_declared=None,
                 **kwargs):
        super(Queue, self).__init__(**kwargs)
        self.name = name or self.name
        self.exchange = exchange or self.exchange
        self.routing_key = routing_key or self.routing_key
        self.bindings = set(bindings or [])
        self.on_declared = on_declared

        # allows Queue('name', [binding(...), binding(...), ...])
        if isinstance(exchange, (list, tuple, set)):
            self.bindings |= set(exchange)
        if self.bindings:
            self.exchange = None

        # exclusive implies auto-delete.
        if self.exclusive:
            self.auto_delete = True
        self.maybe_bind(channel)

#/usr/lib/python2.7/site-packages/kombu/abstract.py
def maybe_bind(self, channel):
        """Bind instance to channel if not already bound."""
        if not self.is_bound and channel:
            self._channel = maybe_channel(channel)
            self.when_bound()
            self._is_bound = True
        return self
#/usr/lib/python2.7/site-packages/kombu/connection.py
def maybe_channel(channel):
    """Returns channel, or returns the default_channel if it's a
    connection."""
    if isinstance(channel, Connection):
        return channel.default_channel
    return channel

kombu층의queue 대상을 만들 때 oslo메시지 층이 채널을 전달하고Nova-scheduler 구성 요소에 있어 채널 대상은 kombu층의 연결 대상이 아니며amqp층의 채널 대상이기 때문에default 를 만들지 않습니다채널 객체그래서 self.channel의 값은amqp층의 channel 대상입니다.뒤에 exchange와queue를 연결하면self.channel.
이 때 kombu층의queue 대상 생성이 완료되었습니다.amqp층의 exchange와queue 대상 생성 및 귀속을 설명합니다.그 코드는self입니다.queue.declare () (TopicConsumer 클래스의 부모 클래스의 초기화 코드에서) 가 터치합니다.
#/usr/lib/python2.7/site-packages/kombu/entity.py:Queue
def declare(self, nowait=False):
        """Declares the queue, the exchange and binds the queue to
        the exchange."""
        # - declare main binding.
        if self.exchange:
            self.exchange.declare(nowait)
        self.queue_declare(nowait, passive=False)

        if self.exchange and self.exchange.name:
            self.queue_bind(nowait)

        # - declare extra/multi-bindings.
        for B in self.bindings:
            B.declare(self.channel)
            B.bind(self, nowait=nowait)
        return self.name

    #/usr/lib/python2.7/site-packages/kombu/entity.py:Exchange
    def declare(self, nowait=False, passive=None):
        """Declare the exchange.

        Creates the exchange on the broker.

        :keyword nowait: If set the server will not respond, and a
            response will not be waited for. Default is :const:`False`.

        """
        passive = self.passive if passive is None else passive
        if self.name:
            return self.channel.exchange_declare(
                exchange=self.name, type=self.type, durable=self.durable,
                auto_delete=self.auto_delete, arguments=self.arguments,
                nowait=nowait, passive=passive,
            )

    #/usr/lib/python2.7/site-packages/kombu/entity.py:Queue
    def queue_declare(self, nowait=False, passive=False):
        """Declare queue on the server.

        :keyword nowait: Do not wait for a reply.
        :keyword passive: If set, the server will not create the queue.
            The client can use this to check whether a queue exists
            without modifying the server state.

        """
        ret = self.channel.queue_declare(queue=self.name,
                                         passive=passive,
                                         durable=self.durable,
                                         exclusive=self.exclusive,
                                         auto_delete=self.auto_delete,
                                         arguments=self.queue_arguments,
                                         nowait=nowait)
        if not self.name:
            self.name = ret[0]
        if self.on_declared:
            self.on_declared(*ret)
        return ret

    #/usr/lib/python2.7/site-packages/kombu/entity.py:Queue
    def queue_bind(self, nowait=False):
        """Create the queue binding on the server."""
        return self.bind_to(self.exchange, self.routing_key,
                            self.binding_arguments, nowait=nowait)

    def bind_to(self, exchange='', routing_key='',
                arguments=None, nowait=False):
        if isinstance(exchange, Exchange):
            exchange = exchange.name
        return self.channel.queue_bind(queue=self.name,
                                       exchange=exchange,
                                       routing_key=routing_key,
                                       arguments=arguments,
                                       nowait=nowait)

    # self.channel @property channel , exchange queue 
    # MaybeChannelBound 。 amqp channel 。 self._channel
    #/usr/lib/python2.7/site-packages/kombu/abstract.py
    @property
    def channel(self):
        """Current channel if the object is bound."""
        channel = self._channel
        if channel is None:
            raise NotBoundError(
                "Can't call method on %s not bound to a channel" % (
                    self.__class__.__name__))
        if isinstance(channel, ChannelPromise):
            channel = self._channel = channel()
        return channel

상기한 것은 kombu층을 통해amqp층을 만드는 exchange와queue 대상과 그들의 귀속 코드 프로세스입니다. 여기서 우리는amqp층의 코드를 붙이지 않습니다.주의해야 할 것은 두 topic 방식의consumer의 exchange 이름이 모두nova라는 것이다. 그러면amqp층의 exchange는 두 개의 nova 이름의 exchange를 만들 수 있습니까?답은 No입니다. 두 번째로 nova 이름의 exchange를 만들 때amqp층이 보호 메커니즘을 감지하기 때문입니다.그래서 노바 이름만 있는 exchange.그리고 두 topic 방법의queue는 이 exchange에 귀속됩니다.
상술한 것이 바로 topic 방식의 코드 프로세스이다. fanout 방식의 코드 프로세스는 topic 방식과 같지만 fanout 방식으로 만든 exchange와queue는 topic 방식과 다르다. fanout 방식의 exchange는 Nova-scheduler 구성 요소의 이름은 nova가 아니라 schedulerfanout,queue 이름은 schedulerfanout_xxxx(xxxx는 무작위 서열을 나타낸다).
요약: 본고는 주로 topic 방식의 consumer를 창설하여consumer의 창설 절차를 설명하는데 주로 세 부분으로 나뉜다.
a. kombu층의 exchange 대상 생성
b. kombu 층의queue 대상을 만듭니다.
c. kombu층의 exchange와queue 대상에 대한 정보를 통해amqp층의 exchange와queue 대상을 만들고amqp층의 exchange와queue 대상을 연결합니다.
# usr/lib/python2.7/site-packages/oslo_messaging/server.py
def start(self):
        """Start handling incoming messages.

        This method causes the server to begin polling the transport for
        incoming messages and passing them to the dispatcher. Message
        processing will continue until the stop() method is called.

        The executor controls how the server integrates with the applications
        I/O handling strategy - it may choose to poll for messages in a new
        process, thread or co-operatively scheduled coroutine or simply by
        registering a callback with an event loop. Similarly, the executor may
        choose to dispatch messages in a new thread, coroutine or simply the
        current thread.
        """
        if self._executor is not None:
            return
        try:
            listener = self.dispatcher._listen(self.transport)
        except driver_base.TransportDriverError as ex:
            raise ServerListenError(self.target, ex)

        self._executor = self._executor_cls(self.conf, listener,
                                            self.dispatcher)
        self._executor.start()

현재, 우리는 usr/lib/python2를7/site-packages/oslo_messaging/server.py의 start 방법 중의listener =self.dispatcher._listen(self.transport) 코드 프로세스 분석이 완료되었습니다.요약하면 다음과 같습니다.
1. oslo를 통해메시지 층과 kombu 층에서amqp 층의connection과 채널 대상을 아래로 만듭니다.
2.connection과 채널 대상을 바탕으로consumer(두 개의 topic와fanout 방식)를 만듭니다.내부는 exchange와queue 대상 생성, 그리고 그들 사이의 binding을 포함합니다.
다음 글은 나머지 두 문장의 코드 흐름을 설명할 것이다.

좋은 웹페이지 즐겨찾기