Java 9 Flow API example – Publisher and Subscriber
Java
215
java9
12
publisher
2
subscriber
2
Male avatar

loveprogramming viết ngày 19/05/2021

https://grokonez.com/java/java-9/java-9-flow-api-example-publisher-and-subscriber

Java 9 Flow API example – Publisher and Subscriber

In previous post, we have general knowledge about Reactive Streams and Java 9 Flow API Components and Behaviour. In this tutorial, we're gonna look at an example that implements Publisher and Subscriber for reactive programming.

Related Articles:

I. Technologies

  • Java 9
  • Eclipse with Java 9 Support for Oxygen (4.7)

    II. Project Overview

    We will create a Publisher that is subscribed by two Subscribers.
  • Publisher maintains a list of Subscriptions, each Subscription is correlative to each Subscriber above.
  • Publisher uses one Subscription to push items to correlative Subscriber by Subscriber::onNext() method.
  • Subscriber uses Subscription to request items from Publisher by Subscription::request() method.
  • Publisher defines an Executor for multi-threading. Then request() and onNext() method work asynchronously, producing data to each Subscriber by Subscription is also asynchronous.
  • After receiving all items successfully, Subscriber can request new data or cancel Subscription (random).

    III. Practice

    To understand how Publisher, Subscriber and Subscription behave and way to implementing them, please visit: Java 9 Flow API – Reactive Streams

    1. Create implementation of Publisher

    
    package com.javasampleapproach.java9flow.pubsub;

import static java.lang.Thread.currentThread;
import static java.util.concurrent.Executors.newSingleThreadExecutor;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

public class MyPublisher implements Publisher {

private static final String LOG_MESSAGE_FORMAT = "Publisher >> [%s] %s%n";

final ExecutorService executor = Executors.newFixedThreadPool(4);
private List<MySubscription> subscriptions = Collections.synchronizedList(new ArrayList<MySubscription>());

private final CompletableFuture<Void> terminated = new CompletableFuture<>();

@Override
public void subscribe(Subscriber<? super Integer> subscriber) {
    MySubscription subscription = new MySubscription(subscriber, executor);

    subscriptions.add(subscription);

    subscriber.onSubscribe(subscription);
}

public void waitUntilTerminated() throws InterruptedException {
    try {
        terminated.get();
    } catch (ExecutionException e) {
        System.out.println(e);
    }
}

private class MySubscription implements Subscription {

    private final ExecutorService executor;

    private Subscriber<? super Integer> subscriber;
    private final AtomicInteger value;
    private AtomicBoolean isCanceled;

    public MySubscription(Subscriber<? super Integer> subscriber, ExecutorService executor) {
        this.subscriber = subscriber;
        this.executor = executor;

        value = new AtomicInteger();
        isCanceled = new AtomicBoolean(false);
    }

    @Override
    public void request(long n) {
        if (isCanceled.get())
            return;

        if (n < 0)
            executor.execute(() -> subscriber.onError(new IllegalArgumentException()));
        else
            publishItems(n);
    }

    @Override
    public void cancel() {
        isCanceled.set(true);

        synchronized (subscriptions) {
            subscriptions.remove(this);
            if (subscriptions.size() == 0)
                shutdown();
        }
    }

    private void publishItems(long n) {
        for (int i = 0; i < n; i++) {

            executor.execute(() -> {
                int v = value.incrementAndGet();
                log("publish item: [" + v + "] ...");
                subscriber.onNext(v);
            });
        }
    }

    private void shutdown() {
        log("Shut down executor...");
        executor.shutdown();
        newSingleThreadExecutor().submit(() -> {

            log("Shutdown complete.");
            terminated.complete(null);
        });
    }

}

private void log(String message, Object... args) {
    String fullMessage = String.format(LOG_MESSAGE_FORMAT, currentThread().getName(), message);

    System.out.printf(fullMessage, args);
}

}

2. Create implementation of Subscriber

More at:

https://grokonez.com/java/java-9/java-9-flow-api-example-publisher-and-subscriber

Java 9 Flow API example – Publisher and Subscriber

Bình luận


White
{{ comment.user.name }}
Bỏ hay Hay
{{comment.like_count}}
Male avatar
{{ comment_error }}
Hủy
   

Hiển thị thử

Chỉnh sửa

Male avatar

loveprogramming

545 bài viết.
97 người follow
Kipalog
{{userFollowed ? 'Following' : 'Follow'}}
Cùng một tác giả
Male avatar
1 0
Tutorial Link: (Link) (Ảnh) Django is a Pythonbased free and opensource web framework that follows the modeltemplateview architectural pattern. A...
loveprogramming viết 7 tháng trước
1 0
Male avatar
1 0
https://loizenai.com/angular11nodejspostgresqlcrudexample/ Angular 11 Node.js PostgreSQL Crud Example (Ảnh) Tutorial: “Angular 11 Node.js Postg...
loveprogramming viết 6 tháng trước
1 0
Male avatar
1 0
Angular Spring Boot jwt Authentication Example Github https://loizenai.com/angularspringbootjwt/ (Ảnh) Tutorial: ” Angular Spring Boot jwt Authe...
loveprogramming viết 6 tháng trước
1 0
Bài viết liên quan
Male avatar
0 0
https://grokonez.com/java/java9/java9flowapireactivestreams Java 9 Flow API – Reactive Streams Java 9 introduces Reactive Streams under java.util...
loveprogramming viết 2 tháng trước
0 0
Male avatar
0 0
https://grokonez.com/javaintegration/distributedsystem/rabbitmqcreatespringrabbitmqpublishsubcribepatternspringboot RabbitMq – How to create Sprin...
loveprogramming viết 1 tháng trước
0 0
{{like_count}}

kipalog

{{ comment_count }}

bình luận

{{liked ? "Đã kipalog" : "Kipalog"}}


Male avatar
{{userFollowed ? 'Following' : 'Follow'}}
545 bài viết.
97 người follow

 Đầu mục bài viết

Vẫn còn nữa! x

Kipalog vẫn còn rất nhiều bài viết hay và chủ đề thú vị chờ bạn khám phá!