Skip to content

Subscriber

Subscriber

A subscriber is a component that listens for messages published by a publisher on a given topic processes them accordingly.

Unlike a consumer, a subscriber receives messages only from the topics it has subscribed to and it's the publisher's responsibility to route the messages accordingly.

Methods:

Name Description
override

Overrides a dependency with a concrete implementation.

subscribe

Decorator for registering a handler function to one or more topics.

receive

Receives a message from a given topic directly and triggers the corresponding handler functions to process it.

Example
subscriber = Subscriber()

@subscriber.subscribe('metrics')
def store_metric(metrics: list):
    for metric in metrics:
        subscriber.receive(metric, metric['name'])

@subscriber.subscribe('loss')
def on_loss(loss):
    print(f"Loss: {loss['value']}")

@subscriber.subscribe('accuracy')
def on_accuracy(accuracy):
    print(f"Accuracy: {accuracy['value']}")

subscriber.receive([
    {'name': 'loss', 'value': 0.1}, 
    {'name': 'accuracy', 'value': 0.9}], 
topic='metrics')

# Output:
# Loss: 0.1
# Accuracy: 0.9

dependency_overrides property

Returns the dependency overrides for the subscriber. This is useful for late binding, testing and changing the behavior of the subscriber in runtime.

Returns:

Name Type Description
dict dict

A dictionary of the dependency map.

override(dependency, implementation)

Overrides a dependency with an implementation.

Parameters:

Name Type Description Default
dependency Callable

The dependency function to override.

required
implementation Callable

The implementation of the function.

required

receive(message, topic)

Receives a message from a given topic and triggers the corresponding handler functions to process it. This is called by the publisher but is also useful for deliver messages between handlers directly.

Parameters:

Name Type Description Default
message Any

The message to process.

required
topic str

The topic to process the message from.

required

register(topic, wrapped)

Registers a handler function with a given topic. Don't use this directly, use the subscribe decorator instead.

Parameters:

Name Type Description Default
topic str

The topic to register the handler function to.

required
wrapped Callable[..., None]

The handler function to register.

required

subscribe(*topics)

Decorator for registering a handler function to one or more topics. Args:

*topics (str): The topics to register the handler function to.

Returns:

Type Description
Callable[..., None]

Callable[..., None]: The decorated handler function.