Part1 - Handle concurrency in Go
#go
2
#go #goroutine #scheduler
1
White

delv viết ngày 11/11/2020

What is Go ?

Golang là một ngôn ngữ được Google phát triển vào năm 2007 và lần đầu tiên ra mắt vào năm 2009, cũng khá xa so với thời điểm hiện tại. Trước đây mình chủ yếu làm các dự án về .NET, Nodejs nên đã khá quen thuộc với các khái niệm multi threading (trong .NET) và single thread (trong Nodejs). Vậy điểm khác biệt chính giữa Golang với các ngôn ngữ trên là gì ?, tài liệu dù bạn search ở bất kỳ đâu và chỗ nào cũng nhắc đến một thế mạnh của Go là xử lý concurrency chỉ với single thread (Go sử dụng single thread làm main thread cho chương trình).

Problem

Câu chuyện đưa mình đến concurrency của Golang không bắt nguồn từ việc học Golang, lúc bắt đầu mình cũng lên mạng tìm ở GoExample và đây nó dẫn mình tới cái khái niệm này "Goroutines". Đại khái để handle nhiều task mà không bị blocking thread thì Go sử dụng Goroutines và channel để giao tiếp giữa các Goroutines với nhau.

Đầu tháng 10 mình tham gia một dự án, backend hoàn toàn viết bằng Go và triển khai dưới dạng mô hình microservice. Phía client sẽ sử dụng trực tiếp gRPC Client để call đến các API của backend. Một trong những vấn đề khi tụi mình triển khai mô hình microservice là các service giao tiếp với nhau như thế nào ? . Tóm lại chỉ có 2 hình thức thôi là "Synchronous protocol and Asynchoronous protocol", với Asynchoronous protocal thì tụi mình sử dụng một message queue là NATS .
Code implement NAT client hiện tại

func RunWorker(
   ctx context.Context,
    logger *zap.Logger,
    courseStudentService *services.CourseStudentService,
    courseClassService *services.CourseClassService,
) nats.Callback {
        courseStudent := &subscriptions.CourseStudent{
            Logger:               logger,
            CourseStudentService: courseStudentService,
        }
        if err := courseStudent.Subscribe(conn); err != nil {
            return fmt.Errorf("CourseStudent.Subscribe: %w", err)
        }

        courseClass := &subscriptions.CourseClass{
            Logger:             logger,
            CourseClassService: courseClassService,
        }
        if err := courseClass.Subscribe(conn); err != nil {
            return fmt.Errorf("jCourseClass.Subscribe: %w", err)
        }
}

và đăng ký với main thread

    worker := RunWorker(ctx, zapLogger, courseStudentService, courseClassService)
    err = worker(busFactory.GetConn())
    if err != nil {
        zapLogger.Panic("err run worker", zap.Error(err))
    }
    busFactory.RegisterCallbackConnected([]nats.Callback{worker})

Với mỗi loại event nhận từ NATS thì mình sẽ đăng ký một subscription tương ứng và subscrie vào topic tương ứng, mọi thứ vẫn rất ok cho đến khi một ngày mặc dù đã publish event lên NATS xong phía subscription lại không insert data vào database. Sau một hồi tìm hiểu thì đã phát hiện ra vấn đề ở chỗ NATS cũng như Kafka đều có cơ chế để đảm bảo thứ tự các event nhận được ở phía Subscriber, tức là vd mình Publish lên NATS các event theo thứ tự Id là "1","2","3". Thì phía Subscriber sẽ nhận tuần tự "1","2", "3', việc đảm bảo thứ tự này là rất quan trọng khi cần xử lý các task tuần tự hoặc là transactions. Nhưng chính tại điểm này, nếu mình handle theo cách trên thì vd khi nhận được event với Id "1" và trong quá trình xử lý bị lỗi và phải retry thì các event đến sau sẽ bị delay mà không xử lý được và điều này là không cho phép vì hệ thống tụi mình cần xử lý nhanh nhất có thể, gần như là near-realtime.

Solution

Để xử lý vấn đề này thì đổi với .NET có lẽ các bạn sẽ nghĩ ngay đến việc sử dụng multi thread, mỗi event tới tạo 1 thread để handle sau đó tiếp tục xử lý event khác, nhưng mình đang dùng Golang vậy solution đầu tiên mình nghĩ tới là sử dụng Goroutine để handle các event này.
Về high-level thì idea như thế này. Mình sử dụng một Goroutine để handle việc subscribe vào topic và mỗi event tới sẽ tự động tạo ra một Goroutine để handle, như vậy thì việc xử lý các event có thể diễn ra một cách concurrency và hoàn toàn không phụ thuộc lẫn nhau, giả sử nếu có một event handle bị lỗi thì nó tiếp tục skip hoặc sử dụng retry-pattern để xử lý.
Ảnh mình lấy từ một bài post trên medium minh họa cơ chế Fan-out Goroutine
alt
code implement

go func() {
        _, err := eventBus.Conn.QueueSubscribe(topic, queue, func(msg *stan.Msg) {
            for _, handler := range eventBus.Subscriptions[reflect.TypeOf(event)] {
                item := handler
                concrete := reflect.ValueOf(item)
                go func() {
                    log.Printf("[Queue] Handle event seg [%s] redeliveryCount [%d] from topic [%s] with handler [%v]", strconv.Itoa(int(msg.Sequence)),
                        msg.RedeliveryCount, topic, reflect.TypeOf(item).String())
                    concrete.MethodByName("Handle").Call([]reflect.Value{reflect.ValueOf(msg), reflect.ValueOf(eventBus.Rb.InChan)})
                }()
            }
        }, stan.DurableName(queue))
        if err != nil {
            log.Printf("error when subscribe topic [%s]. Error --> %v", topic, err)
        }
    }()

Code subscriber

    eventBus.QueueSubscribe(&EventHandlers.RegisterEvent{}, []interface{}{
        &EventHandlers.RegisterEventHandler{},
        &EventHandlers.RegisterEvent2Handler{}}, "register-event-5", "queue-name",
    )
    eventBus.QueueSubscribe(&EventHandlers.LeaveClassEvent{}, []interface{}{&EventHandlers.LeaveClassEventHandler{}}, 
    "leave-class-event", "queue-name-01")

và producer

eventBus := (&EventBus.EventBus{ClientId: "client-01"}).New(10)
    index := 0
    for {
        eventBus.Publish(&EventHandlers.RegisterEvent{ClassID: strconv.Itoa(index)}, "register-event-5")
        time.Sleep(time.Duration(rand.Intn(300)*100) * time.Microsecond)
        eventBus.Publish(&EventHandlers.LeaveClassEvent{ClassID: strconv.Itoa(index)}, "leave-class-event")
        time.Sleep(time.Duration(rand.Intn(250)*100) * time.Microsecond)
        index++
    }

Kết quả khi mình chạy thử ; ) alt text

Code hiện tại mình đang store ở Repo. Tuy giải pháp ở trên đã hoạt động được nhưng vẫn còn một số khuyết điểm như số lượng Goroutine tạo ra vượt quá giới hạn phần cứng hay chưa có handle error trong trường hợp xảy ra lỗi, ở bài tiếp theo mình sẽ tiếp tục giới thiệu cơ chế handle error
DeLe 08-11-2020

Bình luận


White
{{ comment.user.name }}
Bỏ hay Hay
{{comment.like_count}}
Male avatar
{{ comment_error }}
Hủy
   

Hiển thị thử

Chỉnh sửa

White

delv

1 bài viết.
0 người follow
Kipalog
{{userFollowed ? 'Following' : 'Follow'}}
Bài viết liên quan
White
10 3
Tôi có thằng bạn, thời đi học lúc tôi học Java thì nó học C, sau ra đường kiếm sống tôi xài Python thì nó nhập hội Ruby. Gần đây thấy bảo nó lại âm...
Thinh Tran viết hơn 2 năm trước
10 3
{{like_count}}

kipalog

{{ comment_count }}

bình luận

{{liked ? "Đã kipalog" : "Kipalog"}}


White
{{userFollowed ? 'Following' : 'Follow'}}
1 bài viết.
0 người follow

 Đầu mục bài viết

Vẫn còn nữa! x

Kipalog vẫn còn rất nhiều bài viết hay và chủ đề thú vị chờ bạn khám phá!