Skip to content

Raw Message & Eager response

When writing actors, sometimes you need to access the raw message data directly or interact with the message broker prematurely (before the actor finishes executing).

Repid provides the Message dependency for exactly this purpose.

The Message object

You can access the current message being processed by injecting the Message object into your actor. Under the hood, this uses the MessageDependency alias in Repid's dependency injection system.

from typing import Annotated
from repid import Router
from repid import Message

router = Router()

@router.actor
async def my_actor(message: Message) -> None:
    print(message.payload)  # Raw bytes payload
    print(message.headers)  # Dictionary of headers
    print(message.channel)  # The channel this message was received from
    print(message.message_id) # The unique ID of the message (if supported by broker)

Eager responses

By default, Repid automatically acknowledges (ack) a message if your actor returns successfully, and negatively acknowledges (nack) or rejects it if an exception is raised.

However, sometimes you might want to immediately act on the message. Using the Message dependency, you can manually trigger these actions inside your actor.

from repid import Router, Message

router = Router()

@router.actor
async def my_actor(user_id: int, message: Message) -> None:
    if user_id < 0:
        # Invalid user_id. Nack the message immediately.
        await message.nack()
        return

    # Acknowledge early - fire-and-forget pattern
    await message.ack()

    # Now we can do some long-running processing...
    await do_heavy_lifting(user_id)

Available Actions

The Message object provides the following actions:

  • await message.ack(): Acknowledge the message (successful processing).
  • await message.nack(): Negatively acknowledge the message (e.g. temporary failure, usually retry or DLQ depending on the broker).
  • await message.reject(): Reject the message (wasn't accepted for processing, put back in the original queue).
  • await message.reply(payload=b"..."): Atomically (if supported by the server) acknowledge the message and send a reply message.
  • await message.reply_json(payload={"status": "ok"}): Atomically acknowledge and reply with JSON data.

If atomic reply is not suppored by the broker, it's usually the same as calling .ack() and .send_message().

You can also check if a message has already been acted upon using the .is_acted_on property.

Note

Message can only be acted on once. Any later actions are discarded.

Sending New Messages

You can also use the Message object to publish entirely new messages while processing the current one. This is very useful for chaining tasks or event-driven architectures.

from repid import Router, Message

router = Router()

@router.actor
async def process_order(order_id: int, message: Message) -> None:
    # Process the order here...

    # Send an event to another channel
    await message.send_message_json(
        channel="notifications",
        payload={"event": "order_processed", "order_id": order_id}
    )