LongAdder và Golang

LongAdder sử dụng để làm gì

LongAdder xuất hiện từ JDK 8, với mục đích phục vụ bạn đếm một cái gì đó, cũng giống như chức năng tăng/giảm giá trị của AtomicLong . Tuy nhiên AtomicLong rộng hơn, không chỉ đếm mà còn lưu giá trị bất kì khi cần thiết, giống như một biến thông thường.

What! Chỉ đơn giản vậy sao? Thật ra cũng đúng là đơn giản thật!

Tuy nhiên, việc đếm một cái gì đó đôi khi lại là cốt lõi của nhiều vấn đề hay gặp.

Chẳng hạn bạn muốn nghịch ngợm xem google search có nhanh không, bạn make một loạt request đồng thời tới google search. Bạn phát hiện là một vài request bị drop trong số đó. Vậy chắc cần phải đếm xem có bao nhiêu bị drop, bao nhiêu thành công chứ nhỉ? Oh vậy là bạn cần đếm rồi.

Với production thì việc đếm thành công hay thất bại của request giữa các subsystem/micro service có ý nghĩa rất quan trọng. Nó giúp bạn phát hiện các vấn đề của hệ thống, đo đạc reliability, đo đạc perf. Và có nhiều ứng dụng khác nữa của việc đếm mà mình xin phép không liệt kê ở đây.

Đếm là dễ hay khó

Với single thread/single routine thì quá đơn giản, chỉ một biến count là đủ.

Với trường hợp count được shared giữa các thread/các routine, câu chuyện trở nên phức tạp hơn rất rất nhiều để đảm bảo tính chính xác của việc đếm.

Để tìm hiểu sâu hơn, rõ hơn, mình expect các bạn tham khảo 02 lecture này:

Để giải quyết bài toán đếm, bạn có nhiều cách:

  • mutex / semaphore với 1 key (mutex khác với semaphore nhé, các bạn có thể search around)
  • lock free with atomic

Mutex hay semaphore trong trường hợp này khiến các thread đang đợi để access vào biến count phải sleep và được wake up sau. Việc này gây context switching, lãng phí cpu cycle.

Lock free với atomic giúp bạn giải quyết vấn đề này. Các thread không phải sleep rồi wake up mà liên tục kiểm tra biến count thử thay đổi giá trị (tăng/giảm) của count nếu điều kiện được CAS (compare and swap) được thỏa mãn.

var count int64
for {
   if tmp := atomic.LoadInt64(&count); atomic.CompareAndSwapInt64(&count, tmp, tmp + 1) {
      return "Success"
   }
}

Vì việc tăng/giảm biến count diễn ra rất rất nhanh, do đó sử dụng atomic trong trường hợp này sẽ giúp performance cải thiện đáng kể.

Tuy nhiên tình trạng contention vẫn diễn ra khi các thread cứ ganh đua nhau liên tục CAS trên biến count.

LongAdder shining

tl;dr

LongAdder trong JDK đưa ra một cách tiếp cận khá thông minh. Thay vì CAS chỉ trên 1 biến, ta CAS trên nhiều biến, gọi là cell, để hạn chế contention và tổng của các cell sẽ cho ta kết quả cuối cùng. Brilliant!

Tuy nhiên chúng ta sẽ gặp tiếp một vấn đề nữa. Đó là distribution, từ một tập hợp nhiều cells, ta chọn ra được một cell nào đó để thread A đang chạy có thể CAS nhanh chóng và ít contention nhất.

Random Distribution và Striped64

Trước hết các bạn đọc qua bài viết của bạn Nguyễn Hồng Phúc để hiểu qua về cách mà LongAdder vận hành cũng như cấu trúc dữ liệu để lưu trữ nhiều Cells nhé.

Cells đơn giản là một hash table, primitive array. Ban đầu, LongAdder sẽ thử CAS trên biến base và kiểm tra contention đã xảy ra hay chưa. Nếu có contention trên biến base hoặc xung đột đã xảy ra trước đó, LongAdder sẽ sử dụng longAccumulate trong Striped64 để phân phối việc thay đổi giá trị, hạn chế contention.

Striped64 sử dụng ngay một thứ có sẵn trong Java Thread để chọn ra cell. Đó là threadLocalRandomProbe. Nếu giá trị ban đầu của threadLocalRandomProbe là 0, nó sẽ được khởi tạo giá trị mới bằng ThreadLocalRandom.

final void longAccumulate(long x, LongBinaryOperator fn, bolean wasUncontended) {
   int h;
   if ((h = getProbe()) == 0) {
      ThreadLocalRandom.current(); // force initialization
      h = getProbe();
      wasUncontended = true;
   }
   ...
}

threadLocalRandomProbe chính là hashed key ban đầu sử dụng để xác định index của cell trong hash table mà không cần sử dụng hashing function.

static final int getProbe() {
    return UNSAFE.getInt(Thread.currentThread(), PROBE);
}

// Cái code này là mình viết ra để bạn hiểu
cellIndex = getProbe() % hashTableSize 

// Hoặc viết theo kiểu bit operation nhanh hơn, thú vị hơn
cellIndex = getProbe() & (hashTableSize - 1) 

// Vì size của HashTable luôn có dạng 2^K, 
// nên phép % thực chất là phép bit như bạn thấy.
// Cũng vì để tránh phép chia mà quy tắc đặt ra: 
//        Kích thước của hash table luôn phải là 2^K.

// Code thực sự nó thế này
if ((as = cells) != null && (n = as.length) > 0) {
    if ((a = as[(n - 1) & h]) == null) { // h chính là cái probe, còn n là size/length của cái hash table
          ...
    }
 }

Trong bài viết này mình sẽ không nói về threadLocalRandomProbe. Đây là một field khá đặc biệt, các bạn có thể tìm hiểu thêm về nó và ThreadLocalRandom, PRNGs trong Thread.java nhé.

Tiếp tục câu chuyện của chúng ta.

Nếu threadLocalRandomProbe là một hằng số, suy ra thread đó sẽ gần như mãi mãi chỉ có thể access vào một index cố định và contention vẫn xảy ra đều đặn nếu trời xui khiến các probe này randomly nhưng lại đồng dư theo module n, n là length của hash table.

Vậy:

  1. Nếu n thay đổi (kích thước của mảng thay đổi) thì sẽ giúp việc distribution maybe trở nên tốt hơn khi xác suất đồng dư thấp hơn.
  2. Thay đổi giá trị của threadLocalRandomProbe

Striped64 thực hiện cả hai điều trên theo những kịch bản xác định.

Striped64.longAccumulate (P1) (trích từ JDK 9)

Mình sẽ tiến hành giải phẫu hàm này một chút. Logic của hàm này thật ra không quá phức tạp. Nhưng xử lý nó lại phức tạp và tỉ mỉ, cần có thứ tự để đảm bảo thao tác CAS thực hiện đúng.

// Điều kiện tổng, bạn sẽ gặp rẽ nhánh của nó ở P3
if ((as = cells) != null && (n = as.length) > 0) { 

// Bên trong điều kiện tổng:
if ((a = as[(n - 1) & h]) == null) {
   if (cellsBusy == 0) {       // Try to attach new Cell
         Cell r = new Cell(x);   // Optimistically create
         if (cellsBusy == 0 && casCellsBusy()) {
            boolean created = false;
            try {  // Recheck under lock
                Cell[] rs; int m, j;
                if ((rs = cells) != null 
                   && (m = rs.length) > 0 
                   && rs[j = (m - 1) & h] == null) {
                        rs[j] = r;
                        created = true;
                 }
            } finally {
                 cellsBusy = 0;
            }

            if (created) break;
            continue;           // Slot is now non-empty
        }
     }
     collide = false;
}

Hiểu đơn giản là nếu cái cell đang point tới là null, tức là nhiều khả năng chúng ta đang point tới một Cell mới do hash table vừa mới expand/double size lên.

Có 3 điều cực kì cực kì hay ở code trên:

  1. Tác giả luôn expect chuyện tốt lành xảy ra, vì vậy các câu lệnh if hướng tới nó. Tại sao lại như vậy? Là vì để tối ưu branch predictor trong CPU (ref: Branch Predictor, trong đó có đoạn viết: In order to effectively write your code to take advantage of these rules, when writing if-else or switch statements, check the most common cases first and work progressively down to the least common.). Thực tế khi đã point tới một null Cell rồi thì xác suất cao là có thể thực hiện được việc gán giá trị cho cell mới. WOW! AWESOME!
  2. Tác giả coi bước check lock ban đầu cellsBusy == 0 và bước if (cellsBusy == 0 && casCellsBusy()) là không giống nhau. Vì với multithreading, cellsBusy ở bước đầu nhưng sang tới bước sau thì đã bị thay đổi bởi một thread khác.
  3. Và cuối cùng if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null), check lại cell một lần nữa, xem có đúng đây là cell mới (null) hay không hay thread nào đó đã thay đổi rồi.

Striped64.longAccumulate (P2)

                else if (!wasUncontended)       // CAS already known to fail
                    wasUncontended = true;      // Continue after rehash
                else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                             fn.applyAsLong(v, x))))
                    break;
                 else if (n >= NCPU || cells != as)
                    collide = false;            // At max size or stale
                else if (!collide)
                    collide = true;
                else if (cellsBusy == 0 && casCellsBusy()) {
                    try {
                        if (cells == as) {      // Expand table unless stale
                            Cell[] rs = new Cell[n << 1];
                            for (int i = 0; i < n; ++i)
                                rs[i] = as[i];
                            cells = rs;
                        }
                    } finally {
                        cellsBusy = 0;
                    }
                    collide = false;
                    continue;                   // Retry with expanded table
                }
  • Nếu ban đầu (lúc gọi hàm longAccumulate) đã bị đánh giá là đang chiến tranh lạnh (contended) với thread khác thì chờ next loop, và thay đổi luôn probe để tránh việc gây hấn trên cell này. Bạn sẽ thấy hàm sau được gọi nếu điều kiện !wasUncontended xảy ra: h = advanceProbe(h) để thay đổi giá trị của threadLocalRandomProbe như chúng ta đã đề cập ở trên.
  • Nếu không, thử CAS, nếu thành công thì return luôn, nếu không thì thực hiện một loạt các phép thử về việc kiểm tra số cell có vượt ngưỡng hay không. Nếu các điều kiện này xảy ra thì sẽ thử CAS ở vòng lặp sau, và vẫn gọi hàm h = advanceProbe(h)
  • Nhánh cuối cùng là đặc biệt nhất: chúng ta có đủ điều kiện để mở rộng (double size) số cells. Sau khi double xong, không cần thay đổi probe của thread nữa.

Striped64.longAccumulate (P3)

// Đây là rẽ nhánh của điều kiện tổng ở P1
// Mục tiêu là để khởi tạo hash table cell. 

// Nếu khởi tạo thành công thì không thoát luôn khỏi hàm longAccumulate.
// Nếu không thì lại thử CAS trên base một lần nữa
            else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                boolean init = false;
                try {                           // Initialize table
                    if (cells == as) {
                        Cell[] rs = new Cell[2];
                        rs[h & 1] = new Cell(x);
                        cells = rs;
                        init = true;
                    }
                } finally {
                    cellsBusy = 0;
                }
                if (init)
                    break;
            }
            else if (casBase(v = base, ((fn == null) ? v + x :
                                        fn.applyAsLong(v, x))))
                break;                          // Fall back on using base

Không có quá nhiều điều để nói về đoạn này. Tuy nhiên có một dòng code rất tinh tế:

rs[h & 1] = new Cell(x);

Tại sao phải làm vậy?

Khi khởi tạo, Có 2 Cells tất cả, và ta distribute trên này luôn thay vì đơn giản gán rs[0] = new Cell(x). Không còn gì bàn cãi về độ cẩn thận và tinh tế của tác giả.

Porting to Golang

Mình đã giải thích cặn kẽ mọi ngóc ngách của LongAdder bằng cách đi sâu hơn vào Striped64. Và mục tiêu của cuối cùng của mình chính là để porting sang Golang, mình là một gopher và Golang chưa hề có LongAdder.

Với golang, có muôn vàn khó khăn, đơn cử 2 thứ sau:

  1. Bạn không có cái gì cố định để làm probe. Bạn có thể access thread id hoặc go routine id, tuy nhiên chậm và cực kì unsafe
  2. Về mặt syntax, bạn sẽ chẳng bao giờ có cái code kiểu thế này:
casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))

Thế nên code go sẽ dài hơn, dễ sai hơn. Nhưng bất chấp và viết cẩn thận thôi :cry:

Để solve vấn đề về probe, lựa chọn duy nhất của mình hiện tại là dùng System Time (time.Nano) để làm random probe.

Và mình có 02 patch nhỏ.

  1. Thay vì chỉ double size, mình double size nhưng x4 capacity. Vì size của hash table không quá lớn, vì vậy việc x4 trong bộ nhớ thực là khả quan, cho kết quả perf tốt. Tradeoff là tăng bộ nhớ gấp đôi so với thông thường. Bằng cách này mình tiết kiệm một lần allocation.
  2. Max number of cell, mình để con số lớn hơn thông thường, hiện tại là 128 cho phần lớn case.

Kết quả

Đây là thư viện mình viết: https://github.com/linxGnu/go-adder (Ít star lắm, các bạn star đi nhóe :sob:)

Viết xong, benchmark thử, mình sáng chế thêm cái RandomCellAdder nữa, performance cực kì tốt. Mình sẽ viết một bài khác về cái này.

Thư viện LongAdder mình porting còn để phục vụ cho một project khác nữa. Hi vọng là có thể giới thiệu tới các bạn nếu nó có kết quả tốt.

Bình luận


White
{{ comment.user.name }}
Bỏ hay Hay
{{comment.like_count}}
Male avatar
{{ comment_error }}
Hủy
   

Hiển thị thử

Chỉnh sửa

White

Linh Tran Tuan

5 bài viết.
74 người follow
Kipalog
{{userFollowed ? 'Following' : 'Follow'}}
Cùng một tác giả
White
45 23
Một trong những điểm thú vị nhất khi phát triển các hệ thống Business là lập báo cáo doanh thu. Mình đã từng maintain hệ thống cảnh báo sớm của Cụ...
Linh Tran Tuan viết 10 tháng trước
45 23
White
27 4
Bit operations Các phép toán trên bit luôn give best performance và tối giản hóa bộ nhớ. Hôm nay mình viết bài này note lại cho mọi người xài chơi...
Linh Tran Tuan viết 6 tháng trước
27 4
White
24 3
Introduction Buffering (buffered IO) là một trong những kỹ thuật kinh điển khi chúng ta cần đọc/ghi dữ liệu. Trong bài viết này mình sẽ đi sâu hơn...
Linh Tran Tuan viết 10 tháng trước
24 3
Bài viết liên quan
White
9 2
Makefile thực hiện một số thao tác thường dùng trong Go Khi làm project Go mình thường tạo một file Makefile dạng này: Lưu ý nhớ thay thành tên m...
Huy Trần viết 2 năm trước
9 2
White
16 0
Crawl dữ liệu Crawl là một vấn đề hay gặp trong quá trình làm software. Ví dụ lấy tin tức, tin giảm giá, vé xem phim... là những dạng của crawl. Mộ...
Thach Le viết hơn 2 năm trước
16 0
{{like_count}}

kipalog

{{ comment_count }}

bình luận

{{liked ? "Đã kipalog" : "Kipalog"}}


White
{{userFollowed ? 'Following' : 'Follow'}}
5 bài viết.
74 người follow

 Đầu mục bài viết

Vẫn còn nữa! x

Kipalog vẫn còn rất nhiều bài viết hay và chủ đề thú vị chờ bạn khám phá!