[Java Concurrent] CountDownLatch và CyclicBarrier
Java
155
concurrency model
6
synchronization
2
White

Nguyễn Tuấn Anh viết ngày 26/06/2019

Trong bài viết này, chúng ta sẽ tìm hiểu java.util.concurrent.CountDownLatchjava.util.concurrent.CyclicBarrier. Đây là 2 synchronizer bổ trợ cho synchronization (đồng bộ hóa) trong Java.


CountDownLatch

Nguyên lý hoạt động

Lớp CountDownLatch cho phép chúng ta bắt đầu thực hiện một thread X ngay sau khi tất cả các thread A1, A2, A3, ... đều đã hoàn thành.

CountDownLatch sử dụng một biến đếm nội bộ. Khác với Semaphore, biến đếm này chỉ giảm mà không tăng và khi giảm đến 0 thì sẽ dừng. Giá trị khởi tạo của biến đếm được truyền vào từ hàm khởi tạo: CountDownLatch(int count).

Mỗi khi một thread A1, A2, A3, ... hoàn thành, phương thức countDown() sẽ được gọi để giảm biến đếm đi 1. Bên cạnh đó, phương thức await() được gọi để block cho đến khi biến đếm giảm xuống còn 0. Khi đó, thread X chính thức được bắt đầu.

Ví dụ:

MainTask.java

public class MainTask implements Runnable {

    @Override
    public void run() {
        System.out.println("Start main task...");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Done main task!");
    }

}

SubTask.java

import java.util.concurrent.CountDownLatch;

public class SubTask implements Runnable {

    private CountDownLatch countDownLatch;

    public SubTask(CountDownLatch countDownLatch) {
        this.countDownLatch = countDownLatch;
    }


    @Override
    public void run() {
        System.out.println("Start sub task...");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("Done sub task!");
        countDownLatch.countDown();
    }

}

Test.java

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test {

    public static void main(String[] args) {
        CountDownLatch countDownLatch = new CountDownLatch(3);

        ExecutorService executorService = Executors.newFixedThreadPool(3);
        executorService.submit(new SubTask(countDownLatch));
        executorService.submit(new SubTask(countDownLatch));
        executorService.submit(new SubTask(countDownLatch));

        try {
            countDownLatch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(countDownLatch.getCount());
        executorService.submit(new MainTask());

        executorService.shutdown();
    }

}

Kết quả:

Start sub task...
Start sub task...
Start sub task...
Done sub task!
Done sub task!
Done sub task!
0
Start main task...
Done main task!

CyclicBarrier

Nguyên lý hoạt động

Tên gọi của lớp CyclicBarrier đã nói lên đưọc nguyên lý hoạt động của lớp này:

  • barrier: CyclicBarrier cho phép các thread đợi nhau để cùng tiếp cận một điểm chặn (barrier) chung. CyclicBarrier rất hữu ích đối với các chương trình có một số lượng thread cố định phải đợi nhau thì mới có thể xử lý tiếp được.
  • cyclic: CyclicBarrier có thể tái sử dụng sau khi các thread đã được giải phóng.

CyclicBarrier cũng có một biến đếm nội bộ tương tự như của CountDownLatch, chỉ khác là biến đếm của CyclicBarrier có thể reset về giá trị khởi tạo ban đầu. Giá trị khởi tạo ban đầu của biến đếm, hay số lượng thread tối đa có thể tiếp cận barrier, được truyền vào từ hàm khởi tạo: CyclicBarrier(int parties).

Mỗi khi một thread hoàn thành xong công việc đưọc yêu cầu, nó sẽ gọi phương thức await() để "tham gia" vào quá trình đợi các thread khác cũng hoàn thành xong công việc. Sau khi tất cả các thread đều đã await(), đoạn mã dưới await() ở mỗi thread mới được tiếp tục thực hiện.

Ngoài ra, chúng ta còn có thể khai báo một hành động Runnable barrierAction sẽ xảy ra khi tất cả các thread tiếp cận barrier, thông qua hàm khởi tạo: CyclicBarrier(int parties, Runnable barrierAction).

Ví dụ:

Chúng ta sẽ minh họa CyclicBarrier thông qua ví dụ sau: một hệ thống microservice gồm có 3 môi trường. Mỗi môi trường gồm có 3 service. Cả 3 service đều phải chờ nhau khởi động thành công (barrier) thì mới có thể tiếp nhận request từ client.

Service.java

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class Service implements Runnable {

    private String serviceName;
    private CyclicBarrier cyclicBarrier;

    public Service(String serviceName, CyclicBarrier cyclicBarrier) {
        this.serviceName = serviceName;
        this.cyclicBarrier = cyclicBarrier;
    }

    @Override
    public void run() {
        System.out.println("Service " + serviceName + " started...");
        try {
            cyclicBarrier.await();
        } catch (InterruptedException | BrokenBarrierException e) {
            e.printStackTrace();
        }
        System.out.println("Service " + serviceName + " was available to accept request");
    }
}

Test.java

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Test {

    private static final int ENVIRONMENTS = 3;
    private static final int SERVICES = 3;

    public static void main(String[] args) {
        CyclicBarrier cyclicBarrier = new CyclicBarrier(SERVICES, () -> {
            System.out.println("Done one environment");
        });

        ExecutorService executor = Executors.newFixedThreadPool(ENVIRONMENTS);
        for (int i = 1; i <= ENVIRONMENTS; i++) {
            executor.submit(new Service("A" + i, cyclicBarrier));
            executor.submit(new Service("B" + i, cyclicBarrier));
            executor.submit(new Service("C" + i, cyclicBarrier));
        }

        executor.shutdown();
    }

}

Kết quả:

Service A1 started...
Service C1 started...
Service B1 started...
Done one environment
Service B1 was available to accept request
Service A1 was available to accept request
Service A2 started...
Service B2 started...
Service C1 was available to accept request
Service C2 started...
Done one environment
Service C2 was available to accept request
Service A2 was available to accept request
Service B2 was available to accept request
Service C3 started...
Service A3 started...
Service B3 started...
Done one environment
Service B3 was available to accept request
Service C3 was available to accept request
Service A3 was available to accept request
Bình luận


White
{{ comment.user.name }}
Bỏ hay Hay
{{comment.like_count}}
Male avatar
{{ comment_error }}
Hủy
   

Hiển thị thử

Chỉnh sửa

White

Nguyễn Tuấn Anh

24 bài viết.
151 người follow
Kipalog
{{userFollowed ? 'Following' : 'Follow'}}
Cùng một tác giả
White
45 41
MyContact là một ứng dụng mà mình thường viết mỗi khi học một ngôn ngữ hay công nghệ mới. MyContact chỉ là một ứng dụng CRUD đơn giản, cho phép ngư...
Nguyễn Tuấn Anh viết hơn 2 năm trước
45 41
White
27 14
Hướng dẫn lập trình Spring Security Trong bài viết lần này, mình sẽ giúp các bạn bước đầu tìm hiểu (Link) thông qua xây dựng các chức năng: Đăng ...
Nguyễn Tuấn Anh viết hơn 2 năm trước
27 14
White
15 1
Giới thiệu Spring Framework Trong bài viết này, mình sẽ giới thiệu cho các bạn về một trong những Java EE framework rất nổi bật và phổ biến hiện n...
Nguyễn Tuấn Anh viết hơn 2 năm trước
15 1
Bài viết liên quan
White
0 0
Đối với những chương trình đa luồng (multithreads) thường gặp những trường hợp khi nhiều luồng cùng muốn truy cập vào 1 dữ liệu nên có thể gây ra c...
Hùng Nguyễn Văn viết 1 ngày trước
0 0
{{like_count}}

kipalog

{{ comment_count }}

bình luận

{{liked ? "Đã kipalog" : "Kipalog"}}


White
{{userFollowed ? 'Following' : 'Follow'}}
24 bài viết.
151 người follow

 Đầu mục bài viết

Vẫn còn nữa! x

Kipalog vẫn còn rất nhiều bài viết hay và chủ đề thú vị chờ bạn khám phá!