Subscriber
Subscriber
A subscriber is a component that listens for messages published by a publisher on a given topic processes them accordingly.
Unlike a consumer, a subscriber receives messages only from the topics it has subscribed to and it's the publisher's responsibility to route the messages accordingly.
Methods:
Name | Description |
---|---|
override |
Overrides a dependency with a concrete implementation. |
subscribe |
Decorator for registering a handler function to one or more topics. |
receive |
Receives a message from a given topic directly and triggers the corresponding handler functions to process it. |
Example
subscriber = Subscriber()
@subscriber.subscribe('metrics')
def store_metric(metrics: list):
for metric in metrics:
subscriber.receive(metric, metric['name'])
@subscriber.subscribe('loss')
def on_loss(loss):
print(f"Loss: {loss['value']}")
@subscriber.subscribe('accuracy')
def on_accuracy(accuracy):
print(f"Accuracy: {accuracy['value']}")
subscriber.receive([
{'name': 'loss', 'value': 0.1},
{'name': 'accuracy', 'value': 0.9}],
topic='metrics')
# Output:
# Loss: 0.1
# Accuracy: 0.9
dependency_overrides
property
Returns the dependency overrides for the subscriber. This is useful for late binding, testing and changing the behavior of the subscriber in runtime.
Returns:
Name | Type | Description |
---|---|---|
dict |
dict
|
A dictionary of the dependency map. |
override(dependency, implementation)
Overrides a dependency with an implementation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dependency
|
Callable
|
The dependency function to override. |
required |
implementation
|
Callable
|
The implementation of the function. |
required |
receive(message, topic)
Receives a message from a given topic and triggers the corresponding handler functions to process it. This is called by the publisher but is also useful for deliver messages between handlers directly.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
message
|
Any
|
The message to process. |
required |
topic
|
str
|
The topic to process the message from. |
required |
register(topic, wrapped)
Registers a handler function with a given topic. Don't use this directly,
use the subscribe
decorator instead.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
topic
|
str
|
The topic to register the handler function to. |
required |
wrapped
|
Callable[..., None]
|
The handler function to register. |
required |
subscribe(*topics)
Decorator for registering a handler function to one or more topics. Args:
*topics (str): The topics to register the handler function to.
Returns:
Type | Description |
---|---|
Callable[..., None]
|
Callable[..., None]: The decorated handler function. |