Chuyện mấy con Consumer

Dong Nguyen
5 min readDec 22, 2020

--

Vào một ngày đẹp trời, đang ngồi code lan man mình nhận được thông báo từ xếp, một con service tracking của một project cũ đang gây nghẽn database, chuyện là cty đó đang ăn nên làm ra traffic vô site tăng gấp 15 lần, user tương tác nhiều dẫn đến các con service khác cũng tải nhiều hơn, db thì càng ngày càng bị hấp diêm nhiều hơn, ko chỉ đến từ còn service tracking mà còn đến từ nhiều con khác.

Tracking thì ngày trước mình code theo mô hình đơn giản, Client gọi API, đẩy message vào 1 cái queue, có tầm 100 con consumer ngồi hốt message ra process rồi ghi vô database.

Nhưng đến nước này thì chỉ có thể giảm tải cho database bằng cách giảm số lượng con consumer xuống, hạn chế tải đồng thời cho database. Trước kia code vô nhân đạo nên mình ko handle chuyện cần restart lại App thì mới update config. Tranh thủ lúc đang nhàn rỗi, mấy dự án đang ngồi chờ chốt requirement nên tranh thủ update sửa cho em nó.

Nhu cầu cần thiết lúc này là

  • Chỉ cần update file config là app tự động update số consumer
  • Khi deploy new code thì phải graceful shutdown, chờ mỗi consumer xử lí xong message và cho em nó nghỉ ngơi.

Viper có sẵn tính năng watch config file nên mình sẽ dùng luôn

func watchConsumerCount() {
viper.WatchConfig()
viper.OnConfigChange(func(e fsnotify.Event) {
fmt.Println("Config file changed:", e.Name)
n := viper.GetInt("consumer_count")
if n != consumer_count {
fmt.Println("resize consumer count to => ", n)
go ResizeConsumerCount(n)
}
})
}

Khi config có sự thay đổi thì nếu là thay đổi giá trị số consumer thì mình tiến hành chạy function ResizeConsumerCount để update lại số lượng consumer

Để quản lí mỗi consumer sẽ thì mình có cái slice mỗi phần từ có type như sau

type ConsumerRoutine struct {
Id int // ID của consumer
Exit chan bool // Channel để nhận lệnh exit
Channel *amqp.Channel // AMQP channel
Delivery <-chan amqp.Delivery // AMQP delivery chan,để nhận message từ queue
Done chan bool // Channel báo cáo lên thằng quản lí tao xong hết rồi, chuẩn bị thoát
}

Mỗi consumer khi chạy sẽ ôm cái biến này để giao tiếp với thằng quản lí.

Khởi tạo consumer và các thần phần cần thiết khác.

var amql_conn *amqp.Connection
var amqp_url string
var consumer []*ConsumerRoutine
const max_count = 100
var consumer_count int
var queue_name string

func loadConf() {
viper.SetConfigName("config")
viper.SetConfigType("yaml")
viper.AddConfigPath(".")
err := viper.ReadInConfig()
if err != nil {
panic(fmt.Errorf("Fatal error config file: %s \n", err))
}
amqp_url = viper.GetString("amqp_url")
consumer_count = viper.GetInt("consumer_count")
queue_name = viper.GetString("queue_name")
}

func initAMQPConn() (err error) {
amql_conn, err = amqp.Dial(amqp_url)
if err != nil {
panic(err)
}
return err
}

func initConsumerTask() {
consumer = make([]*ConsumerRoutine, max_count)
}

func init() {
loadConf()
initAMQPConn()
initConsumerTask()
go watchConsumerCount()
}

Bắt đầu tạo consumer, mỗi consumer sẽ được chạy trong 1 goroutine và update tình trạng vào cái biến mà nó đươc giao

func startNewConsumer(m *ConsumerRoutine) {
m.Exit = make(chan bool)
m.Done = make(chan bool)
m.Channel, _ = amql_conn.Channel()
m.Channel.QueueDeclare(queue_name, true, false, false, false, nil)
m.Delivery, _ = m.Channel.Consume(queue_name, "", false, false, false, false, nil)
fmt.Println("Consumer : ", m.Id, "started")
for {
select {
case <-m.Exit: // Lắng nghe lệnh exit từ bên ngoài
m.Channel.Close() // Đóng amqp channel
m.Channel = nil
m.Done <- true // Báo cáo là sẵn sàng thoát
fmt.Println("Consumer ", m.Id, " ended")
return
case msg := <-m.Delivery: // Chờ message từ queue
// Xử lí message ở đây
fmt.Println("Consumer ", m.Id, "recived message = ", string(msg.Body))
// Ack hoặc Reject message, ở đây để luôn có data test nên mình reject luôn :pikatroll:
msg.Reject(true)
}
}
}

Mỗi khi chạy một con consumer mới thì nó sẽ lắng nghe lệnh exit từ bên ngoài thông qua channel m.Exit , nhận message từ queue qua channel m.Delivery . Có message thì sẽ xử lí khi nào xong thì nghe tiếp.

Đến lượt function thay đổi số lượng consumer

func ResizeConsumerCount(n int) {
// scale up
// Chỗ này thì thêm phần từ vô slice và sau đó thì chạy một con
// consumer và truyền biến vào để giao tiếp
if n >= consumer_count {
val := consumer_count
consumer_count = n
for i := val; i < n; i++ {
consumer[i] = &ConsumerRoutine{
Id: i,
Exit: make(chan bool),
Done: make(chan bool),
}
go startNewConsumer(consumer[i])
}
return
}
// scale down
// Giả sự số lượng hiện tại là consumer_count
// và số consumer sau khi giảm là n thì mình sẽ
// gửi lệnh exit cho những con consumer từ n->val-1
// vì slice được đếm từ 0
val := consumer_count
consumer_count = n
for i := n; i < val; i++ {
fmt.Println("shuting down consumer : ", i)
go func(m *ConsumerRoutine) {
fmt.Println("Exit Chan Pointer = ", &m.Exit)
m.Exit <- true
}(consumer[i])
<-consumer[i].Done
}
}

Và function main nơi mà mọi thứ bắt đầu.

func main() {
// Mình chạy consumer_count con consumer từ file config
for i := 0; i < consumer_count; i++ {
consumer[i] = &ConsumerRoutine{
Id: i,
}
go startNewConsumer(consumer[i])
}
// khởi tạo 1 channel để lắng nghe signal
stop := make(chan os.Signal, 1)
signal.Notify(stop)
for {
v := <-stop
fmt.Println("SIGNAL : ", v.String())
// nếu signal là hangup thì mình sẽ tắt hết consumer và thoát app
if v.String() == "hangup" {
ResizeConsumerCount(0)
os.Exit(0)
}
}
}

Dưới dây là file config.

amqp_url: "amqp://admin:admin@172.31.39.18:5672"
queue_name: "queue:demo"
consumer_count: 10

Build và chạy demo nào

➜  resizeConsumer go build && ./rc
Consumer : 0 started Exit pointer val = 0xc00019a018
Consumer : 1 started Exit pointer val = 0xc00019a058
SIGNAL : urgent I/O condition
SIGNAL : urgent I/O condition
SIGNAL : urgent I/O condition
SIGNAL : urgent I/O condition
SIGNAL : urgent I/O condition

Sửa config thì 4 con consumer

amqp_url: "amqp://admin:admin@172.31.39.18:5672"
queue_name: "queue:demo"
consumer_count: 4

Thấy update trong console

SIGNAL :  urgent I/O condition
SIGNAL : urgent I/O condition
SIGNAL : urgent I/O condition
SIGNAL : urgent I/O condition
Config file changed: /Users/vimftw/Desktop/code/resizeConsumer/config.yaml
resize consumer count to => 4
SIGNAL : urgent I/O condition
Consumer : 2 started Exit pointer val = 0xc00005a718
Consumer : 3 started Exit pointer val = 0xc00005a758
SIGNAL : urgent I/O condition

Scale down xuống 1

amqp_url: "amqp://admin:admin@172.31.39.18:5672"
queue_name: "queue:demo"
consumer_count: 1

Output

Config file changed: /Users/vimftw/Desktop/code/resizeConsumer/config.yaml
resize consumer count to => 1
shuting down consumer : 1
Exit Chan Pointer = 0xc00019a058
have to exit now
Consumer 1 ended
shuting down consumer : 2
Exit Chan Pointer = 0xc00005a718
have to exit now
Consumer 2 ended
shuting down consumer : 3
Exit Chan Pointer = 0xc00005a758
have to exit now
Consumer 3 ended
SIGNAL : urgent I/O condition
SIGNAL : urgent I/O condition

Thử tắt luôn con app

SIGNAL :  urgent I/O condition
SIGNAL : urgent I/O condition
SIGNAL : hangup
shuting down consumer : 0
Exit Chan Pointer = 0xc00019a018
have to exit now
Consumer 0 ended

Trước mắt thì có vẻ nó hoạt động ổn, mình sẽ test cẩn thận trước khi cho lên production. Lâu quá không viết bài nên mình tranh thủ luôn.
File code mình để ở đây,các bạn thấy hay thì start cho mình với nha https://gist.github.com/dongnguyenltqb/5a4ed0e2dc7d156e3fe6d8dfd064faff

Cảm ơn các bạn đã ghé và đọc bài viết của mình, hihi

--

--

No responses yet