Skip to content

Queues with Producers and Consumers

Tina4 has a light-weight queue mechanism built in using the litequeue library which uses an Sqlite3 database as the container for the queue. For most single instance use cases this is sufficient but if you need to load balance or scale your queue mechanism need to consider other mechanisms. We provide default support for kafka and rabbitmq

Basic Queue with Litequeue

Configuration

We start with a queue configuration

from tina4_python.Queue import Config, Queue, Producer, Consumer

config = Config()
config.litequeue_database_name = "test_queue.db"
# config.queue_type = "litequeue,rabbitmq,kafka"

Queue and Producer

We initialize the queue and create a producer. Producers are used to produce information and publish it to a queue.

queue = Queue(config, topic="some-queue")
producer = Producer(queue)

# produce some data

producer.produce({"event": "create_log", "log_info": "This is an example of a producer creating a log event"})

Consumer

Typically, our consumer will run in a separate thread / or file or even different application. We need to use the same configuration to initialize the queue. The consumer has a call_back to process the messages from the queue. The example below is of a manual acknowledgement which is the most complicated form we support.

from tina4_python.Queue import Config, Queue, Consumer

config = Config()
config.litequeue_database_name = "test_queue.db"
# config.queue_type = "litequeue,rabbitmq,kafka"

def queue_message(queue, err, data):
    # We have set acknowledge to false on our consumer so we have to manually acknowledge the message
    if data is not None and data.status == 1:
        queue.done(data.message_id)

    print("RESULT", err, data)

queue = Queue(config, topic="some-queue")


# Run a consumer with one-second sleep cycles with manual acknowledgement
consumer = Consumer(queue, queue_message, acknowledge=False)
consumer.run(1)

Rabbitmq Queue

You need to install a Rabbit MQ library and this assumes you have RabbitMQ running successfully.

poetry add pika

Configuration

The configuration would be as below and you can add extra config keys depending on what is needed.

config = Config()
config.queue_type = "rabbitmq"
config.rabbitmq_config = {"host": "localhost", "port": 5672}

Kafka Queue

You need to install a Kafka library and this assumes you have Kafka running successfully.

poetry add confluent-kafka

Configuration

The configuration would be as below and you can add extra config keys depending on what is needed.

config = Config()
config.queue_type = "kafka"

# producer minimal config
config.kafka_config  = {
    # User-specific properties that you must set
    'bootstrap.servers': 'localhost:9092',
}

# consumer minimal config
config.kafka_config  = {
    # User-specific properties that you must set
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'default-queue',
    'auto.offset.reset': 'earliest'
}