Queue, BlockingQueue, SynchronousQueue trong java và ứng dụng
Java
84
White

huydx viết ngày 05/06/2017

Bài viết của mình sẽ nói về những khái niệm cơ bản về Queue nói chung và BlockingQueue/SynchronousQueue nói riêng, và tại sao lại cần những cấu trúc dữ liệu này. Đầu tiên chúng ta sẽ nói về một thứ rất rất cơ bản, đó là Queue.

Queue

Chắc hẳn bạn nào đã từng học qua môn cấu trúc dữ liệu và giải thuật sẽ biết đến cấu trúc Queue. Khác với Stack mô tả một hàng đợi dạng LIFO (Last in First out), tức là anh nào vào cuối cùng sẽ được chui ra đầu tiên, thì Queue lại mô tả hàng đợi dạng FIFO (First In First Out), tức là anh nào vào đầu tiên sẽ chui ra đầu tiên.
Các bạn có thể thấy Queue mô tả một cách rất tự nhiên hàng đợi của con người trong cuộc sống hàng ngày
alt text

Trong java thì có rất nhiều loại Queue khác nhau implement cùng một Queue interface.

Throw Exception Not throw Exception Ý nghĩa
Insert add(e) offer(e) Thêm một phần tử
Remove remove(e) poll(e) Xoá một phần tử
Examine element() peek(e) Lấy ra một phần tử mà không xoá

Bạn có thể thấy interface này hơi khó hiểu một chút là để làm cùng một việc là "cho dữ liệu vào queue" có tới tận 2 interface là addoffer. Điểm khác nhau chỉ đơn giản là khi bạn sử dụng một queue có giới hạn số phần tử, thì nếu số phần tử vượt quá giới hạn add sẽ ném Exception còn offer thì không. Cá nhân mình hay sử dụng offerpoll bởi handle Exeption sẽ biến logic của bạn khá là khó nhìn.

Thông thường thì queue chỉ support để thêm phần tử vào đầu (head) của queue hiện tại. Tuy nhiên từ bản 6 thì java đã hỗ trợ thêm một interface gọi là Deque, mà interface này sẽ support cả thêm phần tử vào đuôi với các method như addLast hay offerLast.

Ứng dụng của Queue

Ứng dụng của queue trong thực tế thì rất rất nhiều, tuy nhiên mình sẽ đưa ra một vài ứng dụng khá gần gũi mà bản thân mình đã từng gặp qua rồi:

  • Buffer: Chắc nghe đến 2 từ này nhiều bạn đã Ồ ra rồi nhỉ. Ứng dụng gặp rất nhiều trong thực tế chính là buffer. Chắc bạn nào đã từng làm ruby thì sẽ biết đến một web server gọi là Unicorn. Điểm khó của một web/http server là khi có một "slow" client request (hay là một request đến từ một thiết bị có tốc độ kết nối/xử lý rất chậm, như một chiếc điên thoại sử dụng 3G ở VN chẳng hạn :trollface: ) , thì web server sẽ phải "keep alive" connection đến client đó, và đợi cho đến khi client đó nhận được kết quả trả về! Unicorn xử lý những slow client đó bằng một cách khá thú vị là không thèm đợi những thằng slow client đó, mà dành resource cho những thằng khác. Cụ thể là Unicorn sẽ sử dụng buffer thông qua một reverse proxy như nginx, để tạm "giữ" những thằng slow client ở một hàng đợi, mà chúng ta hãy gọi bằng cái tên mĩ miều là 'buffer'.
  • Mô hình Producer-Consumer : Mô hình này hay được sử dụng trong các hệ thống phân tán, khi mà công việc sẽ được tạo bởi một loạt các tiến trình gọi là Producer, và công việc đó sẽ được xử lý bởi một loạt các tiến trình khác gọi là Consumer. Producer và Consumer có thể nằm trên các máy khác nhau, có thể nằm trên cùng một máy. Vấn đề là làm thế nào chúng ta có thể chuyển công việc từ Producer đến Consumer!! Chắc có nhiều bạn sẽ nghĩ đến việc tạo một instance của producer và pass công việc thông qua parameter, nhưng liệu một solution như thế có thể scale được trên nhiều máy、hay chỉ đơn giản là producer của chúng ta là nhiều Thread khác nhau??? Và chính bài toán này đã đưa chúng ta đến một khái niệm mới gọi là BlockingQueue

alt text

BlockingQueue / SynchronousQueue

BlockingQueue chính là cấu trúc dữ liệu nhằm để giải quyết bài toán producer và consumer mình vừa nói ở trên, và cụ thể hơn là trong môi trường Multi Threaded khi mà nhiều Producers nằm trên nhiều Thread đều muốn "nhét" dữ liệu vào cùng 1 queue. Thao tác trong môi trường multi threaded là một việc không hề dễ dàng, và BlockingQueue giúp chúng ta làm việc đó dễ dàng hơn nhiều.

Hãy thử so sánh nếu không có BlockingQueue chúng ta sẽ phải xử lý thế nào. Đầu tiên chúng ta sẽ implement một thao tác tối thiểu là add vào queue.

public class MultiThreadQueueExample {

    public static void main(String[] args) throws Exception {

        LinkedList queue = new LinkedList(1024);

        Producer producer = new Producer(queue);
        Consumer consumer = new Consumer(queue);

        new Thread(producer).start();
        new Thread(consumer).start();

        Thread.sleep(4000);
    }
}

public class Producer implements Runnable{

    protected LinkedList queue = null;

    public Producer(LinkedList queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            synchronized {                  //this is suck!!!! and slow!!
                queue.put("1");
                Thread.sleep(1000);
            }

            synchronized {                  //this is suck!!!! and slow!!
                queue.put("2");
                Thread.sleep(1000);
            }

            synchronized {                  //this is suck!!!! and slow!!
                queue.put("3");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

public class Consumer implements Runnable{

    protected LinkedList queue = null;

    public Consumer(LinkedList queue) {
        this.queue = queue;
    }

    public void run() {
        try {
            System.out.println(queue.take());
            System.out.println(queue.take());
            System.out.println(queue.take());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Để thao tác với một cấu trúc dữ liệu chung trong môi trường song song thì chúng ta cần đồng bộ những thao tác với dữ liệu đó, mà cụ thể ở đây chính là queue. Việc đồng bộ này không những phiền toái mà còn rất tốn resource.
Ngoài ra bạn nếu bạn cần thêm một số thao tác như: khi queue đầy thì producer sẽ không cố gắng để "nhét" thêm hàng vào queue nữa, thay vào đó sẽ đợi (block) cho đến khi queue được làm trống bớt bởi consumer, thì bạn sẽ phải implement khá là loằng ngoằng bằng một logic đại loại như:

public class YourQueue<E> extends LinkedList<E> {
 private int size; //Bạn phải quản lý size của queue
 private int bound; //Bạn phải tự quản lý max size của queue.... orz 
 private boolean isFull() { return size > bound; }
 @Override
 public take() { super.take(); }
 .....
}

//và bạn phải tự viết logic check queue full và block
while (true) {
    if (queue.isFull) { Thread.sleep(1000); } //chỗ này hiện không có synchronize, bạn phải tự sync sau đó nếu queue của bạn là multi thread (oh my god...)
    else {
        queue.add(elem); // và ở đây cũng cần lock .... orz
    }
}

Nhìn những comment ở trên bạn đủ thấy phiền phức thế nào chỉ với một yêu cầu nhỏ là đợi khi queue đầy. Chính đây là lúc BlockingQueue sẽ giải cứu cho bạn!
BlockingQueue có những tính năng rất hữu ích như:

  • Thread safe khi nhét dữ liệu vào và lấy dữ liệu ra
  • Khi queue chạm ngưỡng thì hành vi nhét dữ liệu vào sẽ bị blocked cho đến khi producer giải phóng bớt dữ liệu ra
  • Khi queue rỗng thì consumer sẽ bị blocked cho đến khi producer nhét chút gì vào

Quả thực là quá tiện. Bạn chỉ cần thay thế s/LinkedList/ArrayBlockingQueue/ ở đoạn code đầu tiên là sẽ không phải thao tác thêm bất cứ hành động synchronize gì cả.

Vậy bạn đã biết BlockingQueue được dùng thế nào, thế còn SynchronousQueue? Về cơ bản SynchronousQueue cũng giống BlockingQueue ở việc giải quyết bài toán hàng đợi cho mô hình producer/consumer, tuy nhiên có một điểm khác là SynchronousQueue sẽ block hành vi nhét dữ liệu vào khi có hành vi lấy dữ liệu ra tương ứng. Tức là SynchronousQueue chỉ cho phép throughput là 1 item trong khi BlockingQueue cho phép buffer với giới hạn trên là N.
SynchronousQueue hữu hiệu trong các bài toán mà bạn biết producer sẽ cho ra dữ liệu với tốc độ tương đương với consumer mà hệ thống của bạn lại cần tiết kiệm memory ở mức lớn nhất có thể

Kết luận

Thao tác với hệ thống song song rất phiền phức và dễ gây lỗi. Rất may mắn là java với bộ thư viện concurrent đã làm giúp cho chúng ta rất nhiều thứ, giúp cho việc tính toán song song trở nên đơn giản hơn nhiều. Nếu có thời gian mình sẽ tiếp tục giới thiệu về các data structure khác trong bộ thư viện concurrent chuẩn của java.

Bình luận


White
{{ comment.user.name }}
Bỏ hay Hay
{{comment.like_count}}
Male avatar
{{ comment_error }}
Hủy
   

Hiển thị thử

Chỉnh sửa

White

huydx

118 bài viết.
1046 người follow
Kipalog
{{userFollowed ? 'Following' : 'Follow'}}
Cùng một tác giả
White
164 15
Introduction (Link) là một cuộc thi ở Nhật, và cũng chỉ có riêng ở Nhật. Đây là một cuộc thi khá đặc trưng bởi sự thú vị của cách thi của nó, những...
huydx viết 2 năm trước
164 15
White
146 14
Một ngày đẹp trời, bạn quyết định viết một dịch vụ web dự định sẽ làm thay đổi cả thế giới. Dịch vụ của bạn sẽ kết nối tất cả các thiết bị di động ...
huydx viết 2 tháng trước
146 14
White
133 15
Happy programmer là gì nhỉ, chắc ai đọc xong title của bài post này cũng không hiểu ý mình định nói đến là gì :D. Đầu tiên với cá nhân mình thì hap...
huydx viết hơn 3 năm trước
133 15
Bài viết liên quan
White
0 0
Trong bài viết này, một số hình ảnh hoặc nọi dung có thể bị thiếu do quá trình chế bản. Vui lòng xem nội dung ở blog gốc sau: (Link) (Link), chúng...
programmerit viết 3 năm trước
0 0
Male avatar
9 5
Facade Design Patern Facade Patern thuộc vào họ mô hình cấu trúc (structural patern). Facade patern phát biểu rằng : "just provide a unified an...
DuongVanTien viết 2 năm trước
9 5
White
4 2
Một số hàm giúp mapping data đơn giản với Groovy Ví dụ với list data như sau: def student1 = name: "Huan", age: 22, gender: "male"] def student...
Tất Huân viết 4 tháng trước
4 2
{{like_count}}

kipalog

{{ comment_count }}

bình luận

{{liked ? "Đã kipalog" : "Kipalog"}}


White
{{userFollowed ? 'Following' : 'Follow'}}
118 bài viết.
1046 người follow

 Đầu mục bài viết

Vẫn còn nữa! x

Kipalog vẫn còn rất nhiều bài viết hay và chủ đề thú vị chờ bạn khám phá!