Using postgres as task queue for asynchronous workers architecture
Assume a condition where you have one function which is going to produce task and you have multiple asynchronous workers consuming tasks and working on them.
Its essentially like Single Producer Multiple Consumer system. How you gonna implement a queue to solve the problem.
Do you gonna use an in-memory queue, like what we get in programming languages.
Watch out, the application is going to be deployed in a replicated mode with containers, so we can't really have a state in out application cause we don't have guarantee if the task is going to finish before the container dies.
And if we use an in-memory data structure (queue) to store then we miss the essence of replicated deployment because only one instance is carrying out the computations.
So, we are not going to use an in-memory queue.
We need something on disk, like a database. I know two ways of implementing these workers,
Having separate application for producer and workers.
This architecture allows us to use a Message Queue like RabbitMQ. Message queue is more powerful when it comes to delivering message and data reliably.
RabbitMQ will send the message/data to our worker so that the worker does not need to pool the message form the queue.
But what if the producer and worker are in same application.
Use it in a single application along with the producers, both the producers and workers will work asynchronously.
In this architecture the producer and the worker are in the same application, that is why using message queue is not good idea.
So, finally we are going to use Postgres for implementing the queue. In this the worker need to pool for the data from the database because postgres will not send them to us just like what RabbitMQ does.
The Problem.
We are going to create an application which will going to echo some text for some interval (time), the text and the time is given by the user.
So lets go..
I am going to use GoLang
for the example, though using PostgreSQL
as a task queue will work with any language.
Lets create a brand new project
You can get the final source code at k9exp/postgres-task-queue
go mod init postgres-task-queue
touch main.go # create main.go file
Add this code in your main.go
, this will create a simple http server
which will listen on PORT 4545
. The endpoint /producer
is to trigger the producer.
package main
import (
"log"
"net/http"
)
type RequestPayload struct {
Text string `json:"text"`
Time uint32 `json:"time"`
}
// POST /producer
func producer(w http.ResponseWriter, r *http.Request) {
// retrieve text and time
// insert the text and time in the postgres table
// return request
}
func worker(err chan error) {
// pool the first element the queue
// do what it required to do
// repeat
}
func app(err chan error) {
http.HandleFunc("/producer", producer)
POST := "4545"
log.Printf("Serving on http://localhost:%s\n", PORT)
err <- http.ListenAndServe(":"+PORT, nil)
}
func main() {
err := make(chan error, 1)
go app(err)
go worker(err)
e := <-err
log.Printf("Got error: %v\n", e.Error())
}
Lets focus on producer
. We need to get the text and time from the request body and insert the two in the task queue, that's all producer has to do.
Go ahead and get the text
and the time
from the request body (which is JSON
) using encoding/json
from go-standard library.
[...]
import (
"encoding/json" // new import
"log"
"net/http"
)
type RequestPayload struct {
Text string `json:"text"`
Time uint32 `json:"time"`
}
// POST /producer
func producer(w http.ResponseWriter, r *http.Request) {
var requestPayload RequestPayload
err := json.NewDecoder(r.Body).Decode(&requestPayload)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
text := requestPayload.Text
time := requestPayload.Time
// insert the text and time in the postgres table
// return request
}
[...]
Now we have to inset the data in the database, but we haven't talked about the database yet. So let start.
Create a new module called data
in the project to handle all the database related stuff.
mkdir data # create data directory
touch data/queue.go # create queue.go file in data directory
The queue.go
will look something like this
package data
func SetupQueue() {
// setup database
// create queue table
}
func InsertTask() {
// insert task in the queue
}
To use PostgreSQL
in go
, we need a postgres driver to use it with go standard libs database/sql
, so we are going to use github.com/lib/pq
.
Download it
go get github.com/lib/pq
After it downloads, let write a sql query to create queue
table on the database.
package data
// new imports
import (
"database/sql"
"os"
_ "github.com/lib/pq"
)
var DB *sql.DB // new global variable
func SetupQueue() error {
connString := os.Getenv("DATABASE_URL")
db, err := sql.Open("postgres", connString)
if err != nil {
return err
}
DB = db // Set the global variable
// CREATE QUEUE TABLE
_, err = DB.Exec(
`CREATE TABLE IF NOT EXISTS queue (
task_id SERIAL PRIMARY KEY,
text TEXT NOT NULL,
time INT NOT NULL
);
`)
if err != nil {
return err
}
log.Println("Table \"queue\" is created in the database")
return nil
}
To connect with postgres we need a running postgres server, you can run it with docker or use a manged postgres service like Neon. In any case you will have a Database Connection String which looks like this postgres://username:password@host:port/database
We are specifying this DATABASE_URL
trough Environment Variables.
Let use this SetupQueue
in the main.go
package main
import (
"encoding/json"
"log"
"net/http"
"k9exp/postgres-task-queue/data" // new import
)
[...]
func main() {
db_err := data.SetupQueue()
if db_err != nil {
log.Fatal(db_err)
}
err := make(chan error, 1)
go app(err)
go worker(err)
e := <-err
log.Printf("Got error: %v\n", e.Error())
}
When the app runs the table will be created. Lets run the app
I am using Neon for managed postgres database, its free.
export DATABASE_URL=postgres://kunalsin9h:[email protected]/queue
go run main.go
The output will be
$ go run main.go
2023/11/07 15:26:53 Table "queue" is created in the database
2023/11/07 15:26:53 Serving on http://localhost:4545
The database must have got a new table called queue
.
Nice, we are making progress...
Now back to producer
, we have extracted the text
and time
from the request
payload, now we need to insert these things in the database, for that we need to write an insert sql query.
So lets complete InsertTask
in data/queue.go
, it is easy we just need to insert tow items in table.
[...]
func InsertTask(text string, time uint32) error {
_, err := DB.Exec(`
INSERT INTO queue (text, time)
VALUES ($1, $2)
`, text, time)
return err
}
Now lets use this new function to insert the text
and time
extracted in producer handler
package main
import (
"encoding/json"
"log"
"net/http"
"k9exp/postgres-task-queue/data" // new import
)
// POST /producer
func producer(w http.ResponseWriter, r *http.Request) {
var requestPayload RequestPayload
err := json.NewDecoder(r.Body).Decode(&requestPayload)
if err != nil {
w.WriteHeader(http.StatusBadRequest)
return
}
text := requestPayload.Text
time := requestPayload.Time
// new
// inserting the data into database
err = data.InsertTask(text, time)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Write([]byte("Task added in the queue\n"))
w.WriteHeader(http.StatusAccepted)
return
}
[...]
At this point the /producer
endpoint will work and add the text
and time
data into the queue
table.
To check it, run the app...
export DATABASE_URL=....
go run main.go
To add a new entry in database table, do
$ curl -X POST -d '{"text": "new_text", "time": 2}' http://localhost:4545/producer
Task added in the queue
Awesome, get the producer working!
Now lets focus on worker
, as of now the worker function must be like this
[...]
func worker(err chan error) {
// pool the first element the queue
// do what it required to do
// repeat
}
[...]
As we have already discuses about the worker to pool the data from the queue
. Pool here means to fetch data on regular interval from the data-source.
So to get the first element of the queue, lets write a GetTask
function in the data/queue.go
[...]
type TaskData struct {
task_id uint32
text string
time uint32
}
func GetTask() (*TaskData, error) {
// todo
return nil, nil
}
We need to get the first element from the queue, and delete it also.
import (
"context" // new import
"database/sql"
"errors"
"log"
"os"
_ "github.com/lib/pq"
)
[...]
type TaskData struct {
Task_id uint32
Text string
Time uint32
}
func GetTask() (*TaskData, error) {
var data TaskData
trx, err := DB.BeginTx(context.Background(), nil)
if err != nil {
return nil, err
}
defer trx.Rollback()
err = trx.QueryRow(`SELECT task_id, text, time FROM queue FOR UPDATE SKIP LOCKED LIMIT 1;`).Scan(&data.Task_id, &data.Text, &data.Time)
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
} else if err != nil {
return nil, err
}
_, err = trx.Exec("DELETE FROM queue WHERE task_id = $1;", data.Task_id)
if err != nil {
return nil, err
}
if err := trx.Commit(); err != nil {
return nil, err
}
return &data, nil
}
Here, DB.BeginTx()
will create a transaction and trx.Commit()
will commit the transaction.
The Most Important part of this blog is here. How we will choose the first element? isn't SELECT
statement will always start from the top?
What we are going to use is the Database Locks
. This query contains FOR UPDATE
and SKIP LOCKED
clauses.
FOR UPDATE: This clause is used in a transaction to indicate that the rows selected by the query are to be locked exclusively by the current transaction. This prevents other transactions from modifying or locking the same rows until the current transaction is committed or rolled back.
SKIP LOCKED: This is an extension to the "FOR UPDATE" clause. When used, it allows the SELECT statement to skip over any rows that are already locked by another transaction. This is useful in scenarios where you want to avoid waiting for locked rows and instead proceed with the next available unlocked row.
This way we are always going to get the row which is not select by any worker. After we get the data from the row we will delete the row.
Let use this function in worker
function.
[...]
func worker(err chan error) {
for {
data, e := data.GetTask()
if e != nil {
err <- e
}
if data == nil {
fmt.Println("Queue is empty")
time.Sleep(10 * time.Second)
continue
}
printText(data)
time.Sleep(1 * time.Second)
}
}
func printText(data *data.TaskData) {
for i := 0; i < int(data.Time); i++ {
fmt.Printf("\t%d > %s [%d/%d]\n", data.Task_id, data.Text, i+1, data.Time)
time.Sleep(200 * time.Millisecond)
}
}
[...]
We are going to loop infinitely with 1 second pause, when the queue is empty, then we are going to pause for 10 seconds.
When we get data then we use print it using printText
function. We are also going to sleep for 200 Millisecond (1/2 second) when we print text
for time
intervals.
With this, we have finish the application.
What should it do?
It will get the top most element from the queue
(in our case from the postgres table), then delete the entry (i.e consume it) and print the text
for time
interval.
We can use POST /producer
endpoint to add entry in the queue
.
Let run it again, Here is a video of me running it.
The real magic is when we have more then 1 workers.
So lets refactor the code to show the worker id
in the output. We will use a worker_id
to print output by that worker, for this we need to make change in the few place.
[...]
func worker(err chan error, worker_id uint16) { // working id
for {
data, e := data.GetTask()
if e != nil {
err <- e
}
if data == nil {
fmt.Println("Queue is empty")
time.Sleep(10 * time.Second)
continue
}
printText(data, worker_id) // passing here
time.Sleep(1 * time.Second)
}
}
func printText(data *data.TaskData, worker_id uint16) {
for i := 0; i < int(data.Time); i++ {
// using worker_id to print output from the worker
fmt.Printf("\ttask: %d, by worker: %d> %s [%d/%d]\n", data.Task_id, worker_id, data.Text, i+1, data.Time)
time.Sleep(200 * time.Millisecond)
}
}
func app(err chan error) {
http.HandleFunc("/producer", producer)
PORT := "4545"
log.Printf("Serving on http://localhost:%s\n", PORT)
err <- http.ListenAndServe(":"+PORT, nil)
}
func main() {
y := data.SetupQueue()
if y != nil {
log.Fatal(y)
}
err := make(chan error, 1)
go app(err)
go worker(err, 1) // here giving worker_id
e := <-err
log.Printf("Got error: %v\n", e.Error())
}
Now we can spawn more worker, by just calling worker(err, 1)
multiple times like
[...]
func main() {
y := data.SetupQueue()
if y != nil {
log.Fatal(y)
}
err := make(chan error, 1)
go app(err)
go worker(err, 1)
go worker(err, 2)
e := <-err
log.Printf("Got error: %v\n", e.Error())
}
Lets see the output
So you can see both worker are working together to solve the problem.
we can even spawn 100s of workers lets do that.
[...]
func main() {
y := data.SetupQueue()
if y != nil {
log.Fatal(y)
}
err := make(chan error, 1)
go app(err)
for i := 0; i < 100; i++ {
go worker(err, uint16(i+1))
}
e := <-err
log.Printf("Got error: %v\n", e.Error())
}
Lets see the output, it going to be wild, i am going to insert lot more rows.
Wow, how cool. And this is just one instance of application, imagine the app is deployed using kubernetes with 10 replicas with 100 worker each...
So lets wrap what we have done.
- We used
PostgreSQL
for our task queue.- Because we need something permanent.
- The worker and the producer are in same application, that's why we have not used message queues like RabbitMQ.
- We are adding task in the queue using a REST API
- The tasks are being completed by the
async workers
I hope you enjoyed the blog, if you like the content then don't forgot to subscribe the blog for getting notifications about future blogs.