Common Utilities - kombu.common

kombu.common

Common Utilities.

class kombu.common.Broadcast(name=None, queue=None, **kwargs)

Convenience class used to define broadcast queues.

Every queue instance will have a unique name, and both the queue and exchange is configured with auto deletion.

Parameters:
  • name – This is used as the name of the exchange.
  • queue – By default a unique id is used for the queue name for every consumer. You can specify a custom queue name here.
  • **kwargs – See Queue for a list of additional keyword arguments supported.
kombu.common.maybe_declare(entity, channel=None, retry=False, **retry_policy)
kombu.common.uuid()

Generate a unique id, having - hopefully - a very small chance of collision.

For now this is provided by uuid.uuid4().

kombu.common.itermessages(conn, channel, queue, limit=1, timeout=None, Consumer=<class 'kombu.messaging.Consumer'>, callbacks=None, **kwargs)
kombu.common.send_reply(exchange, req, msg, producer=None, **props)
kombu.common.isend_reply(pool, exchange, req, msg, props, **retry_policy)
kombu.common.collect_replies(conn, channel, queue, *args, **kwargs)
kombu.common.insured(pool, fun, args, kwargs, errback=None, on_revive=None, **opts)

Ensures function performing broker commands completes despite intermittent connection failures.

kombu.common.ipublish(pool, fun, args=(), kwargs={}, errback=None, on_revive=None, **retry_policy)
kombu.common.drain_consumer(consumer, limit=1, timeout=None, callbacks=None)
kombu.common.eventloop(conn, limit=None, timeout=None, ignore_timeouts=False)

Best practice generator wrapper around Connection.drain_events.

Able to drain events forever, with a limit, and optionally ignoring timeout errors (a timeout of 1 is often used in environments where the socket can get “stuck”, and is a best practice for Kombu consumers).

Examples

eventloop is a generator:

>>> from kombu.common import eventloop

>>> it = eventloop(connection, timeout=1, ignore_timeouts=True)
>>> it.next()   # one event consumed, or timed out.

>>> for _ in eventloop(connection, timeout=1, ignore_timeouts=True):
...     pass  # loop forever.

It also takes an optional limit parameter, and timeout errors are propagated by default:

for _ in eventloop(connection, limit=1, timeout=1):
    pass

See also

itermessages(), which is an event loop bound to one or more consumers, that yields any messages received.

Table Of Contents

Previous topic

Connection

Next topic

Mixin Classes - kombu.mixins

This Page