This document describes the current stable version of Celery (5.6). For development docs, go here.

celery.worker.consumer

Worker consumer.

class celery.worker.consumer.Agent(c, **kwargs)[source]

Agent starts https://pypi.org/project/cell/ actors.

conditional = True
create(c)[source]

Create the step.

name = 'celery.worker.consumer.agent.Agent'
requires = (step:celery.worker.consumer.connection.Connection{()},)
class celery.worker.consumer.Connection(c, **kwargs)[source]

Service managing the consumer broker connection.

info(c)[source]
name = 'celery.worker.consumer.connection.Connection'
shutdown(c)[source]
start(c)[source]
class celery.worker.consumer.Consumer(on_task_request, init_callback=<function noop>, hostname=None, pool=None, app=None, timer=None, controller=None, hub=None, amqheartbeat=None, worker_options=None, disable_rate_limits=False, initial_prefetch_count=2, prefetch_multiplier=1, **kwargs)[source]

Consumer blueprint.

class Blueprint(steps=None, name=None, on_start=None, on_close=None, on_stopped=None)[source]

Consumer blueprint.

default_steps = ['celery.worker.consumer.connection:Connection', 'celery.worker.consumer.mingle:Mingle', 'celery.worker.consumer.events:Events', 'celery.worker.consumer.gossip:Gossip', 'celery.worker.consumer.heart:Heart', 'celery.worker.consumer.control:Control', 'celery.worker.consumer.tasks:Tasks', 'celery.worker.consumer.delayed_delivery:DelayedDelivery', 'celery.worker.consumer.consumer:Evloop', 'celery.worker.consumer.agent:Agent']
name = 'Consumer'
shutdown(parent)[source]
Strategies

alias of dict

add_task_queue(queue, exchange=None, exchange_type=None, routing_key=None, **options)[source]
apply_eta_task(task)[source]

Method called by the timer to apply a task with an ETA/countdown.

broker_connection_retry_attempt = 0

Counter to track number of conn retry attempts to broker. Will be reset to 0 once successful

bucket_for_task(type)[source]
call_soon(p, *args, **kwargs)[source]
cancel_active_requests()[source]

Cancel active requests during shutdown.

Cancels all active requests that either do not require late acknowledgments or, if they do, have not been acknowledged yet.

Does not cancel successful tasks, even if they have not been acknowledged yet.

cancel_task_queue(queue)[source]
connect()[source]

Establish the broker connection used for consuming tasks.

Retries establishing the connection if the broker_connection_retry setting is enabled

connection_for_read(heartbeat=None)[source]
connection_for_write(url=None, heartbeat=None)[source]
create_task_handler(promise=<class 'vine.promises.promise'>)[source]
ensure_connected(conn)[source]
first_connection_attempt = True

This flag will be turned off after the first failed connection attempt.

init_callback = None

Optional callback called the first time the worker is ready to receive tasks.

loop_args()[source]
property max_prefetch_count
on_close()[source]
on_connection_error_after_connected(exc)[source]
on_connection_error_before_connected(exc)[source]
on_decode_error(message, exc)[source]

Callback called if an error occurs while decoding a message.

Simply logs the error and acknowledges the message so it doesn’t enter a loop.

Parameters:
  • message (kombu.Message) – The message received.

  • exc (Exception) – The exception being handled.

on_invalid_task(body, message, exc)[source]
on_ready()[source]
on_send_event_buffered()[source]
on_unknown_message(body, message)[source]
on_unknown_task(body, message, exc)[source]
perform_pending_operations()[source]
pool = None

The current worker pool instance.

register_with_event_loop(hub)[source]
reset_rate_limits()[source]
restart_count = -1
shutdown()[source]
start()[source]
stop()[source]
timer = None

A timer used for high-priority internal tasks, such as sending heartbeats.

update_strategies()[source]
class celery.worker.consumer.Control(c, **kwargs)[source]

Remote control command service.

include_if(c)[source]

Return true if bootstep should be included.

You can define this as an optional predicate that decides whether this step should be created.

name = 'celery.worker.consumer.control.Control'
requires = (step:celery.worker.consumer.tasks.Tasks{(step:celery.worker.consumer.mingle.Mingle{(step:celery.worker.consumer.events.Events{(step:celery.worker.consumer.connection.Connection{()},)},)},)},)
class celery.worker.consumer.Events(c, task_events=True, without_heartbeat=False, without_gossip=False, **kwargs)[source]

Service used for sending monitoring events.

name = 'celery.worker.consumer.events.Events'
requires = (step:celery.worker.consumer.connection.Connection{()},)
shutdown(c)[source]
start(c)[source]
stop(c)[source]
class celery.worker.consumer.Gossip(c, without_gossip=False, interval=5.0, heartbeat_interval=2.0, **kwargs)[source]

Bootstep consuming events from other workers.

This keeps the logical clock value up to date.

call_task(task)[source]
compatible_transport(app)[source]
compatible_transports = {'amqp', 'redis'}
election(id, topic, action=None)[source]
get_consumers(channel)[source]
label = 'Gossip'
name = 'celery.worker.consumer.gossip.Gossip'
on_elect(event)[source]
on_elect_ack(event)[source]
on_message(prepare, message)[source]
on_node_join(worker)[source]
on_node_leave(worker)[source]
on_node_lost(worker)[source]
periodic()[source]
register_timer()[source]
requires = (step:celery.worker.consumer.mingle.Mingle{(step:celery.worker.consumer.events.Events{(step:celery.worker.consumer.connection.Connection{()},)},)},)
start(c)[source]
class celery.worker.consumer.Heart(c, without_heartbeat=False, heartbeat_interval=None, **kwargs)[source]

Bootstep sending event heartbeats.

This service sends a worker-heartbeat message every n seconds.

Note

Not to be confused with AMQP protocol level heartbeats.

name = 'celery.worker.consumer.heart.Heart'
requires = (step:celery.worker.consumer.events.Events{(step:celery.worker.consumer.connection.Connection{()},)},)
shutdown(c)
start(c)[source]
stop(c)[source]
class celery.worker.consumer.Mingle(c, without_mingle=False, **kwargs)[source]

Bootstep syncing state with neighbor workers.

At startup, or upon consumer restart, this will:

  • Sync logical clocks.

  • Sync revoked tasks.

compatible_transport(app)[source]
compatible_transports = {'amqp', 'gcpubsub', 'redis'}
label = 'Mingle'
name = 'celery.worker.consumer.mingle.Mingle'
on_clock_event(c, clock)[source]
on_node_reply(c, nodename, reply)[source]
on_revoked_received(c, revoked)[source]
requires = (step:celery.worker.consumer.events.Events{(step:celery.worker.consumer.connection.Connection{()},)},)
send_hello(c)[source]
start(c)[source]
sync(c)[source]
sync_with_node(c, clock=None, revoked=None, **kwargs)[source]
class celery.worker.consumer.Tasks(c, **kwargs)[source]

Bootstep starting the task message consumer.

info(c)[source]

Return task consumer info.

name = 'celery.worker.consumer.tasks.Tasks'
qos_global(c) bool[source]

Determine if global QoS should be applied.

Additional information:

https://www.rabbitmq.com/docs/consumer-prefetch https://www.rabbitmq.com/docs/quorum-queues#global-qos

requires = (step:celery.worker.consumer.mingle.Mingle{(step:celery.worker.consumer.events.Events{(step:celery.worker.consumer.connection.Connection{()},)},)},)
shutdown(c)[source]

Shutdown task consumer.

start(c)[source]

Start task consumer.

stop(c)[source]

Stop task consumer.