Reactive programming in Java con RxJava

Mattepuffo's logo
Reactive programming in Java con RxJava

Reactive programming in Java con RxJava

RxJava è una implementazione per Java di ReactiveX, un API per la programmazione asincrona.

In questo articolo vediamo un esempio del suo utilizzo.

Invieremo delle richieste tramite WebSocket; purtroppo non posso mettervi l'url che ho usato io perchè non è pubblico.

Detto ciò, per installare la libreria se usate Maven:

<dependency>
    <groupId>io.reactivex.rxjava2</groupId>
    <artifactId>rxjava</artifactId>
    <version>2.2.21</version>
</dependency>
<dependency>
    <groupId>com.squareup.okhttp3</groupId>
    <artifactId>okhttp</artifactId>
    <version>4.9.1</version>
</dependency>

Abbiamo installato anche una libreria per l'invio di richieste tramite HTTP; ovviamente potete usare quella che volete.

Questo il codice di esempio:

import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.schedulers.Schedulers;
import okhttp3.*;
import okio.ByteString;

public class Main {
    public static void main(String[] args) {
        String url = "";
        OkHttpClient client = new OkHttpClient();
        Request request = new Request.Builder().url(url).build();

        Observable<String> observable = Observable.create(emitter -> {
            WebSocket webSocket = client.newWebSocket(request, new WebSocketListener() {
                @Override
                public void onOpen(WebSocket webSocket, okhttp3.Response response) {
                    System.out.println("CONNESSIONE APERTA");
                }

                @Override
                public void onMessage(WebSocket webSocket, String text) {
                    emitter.onNext(text);
                }

                @Override
                public void onMessage(WebSocket webSocket, ByteString bytes) {
                }

                @Override
                public void onClosing(WebSocket webSocket, int code, String reason) {
                    webSocket.close(code, reason);
                }

                @Override
                public void onClosed(WebSocket webSocket, int code, String reason) {
                    emitter.onComplete();
                }

                @Override
                public void onFailure(WebSocket webSocket, Throwable t, okhttp3.Response response) {
                    emitter.onError(t);
                }
            });

            emitter.setDisposable(Disposables.fromRunnable(() -> webSocket.close(1, "CHIUSURA WEBSOCKET")));
        });

        observable
                .subscribeOn(Schedulers.io())
                .subscribe(new Observer<>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        System.out.println("SOTTOSCRIZIONE");
                    }

                    @Override
                    public void onNext(String event) {
                        System.out.println("EVENTO: " + event);
                    }

                    @Override
                    public void onError(Throwable e) {
                        System.out.println("ERRORE: " + e.getMessage());
                    }

                    @Override
                    public void onComplete() {
                        System.out.println("FINITO");
                    }
                });

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        }
    }
}

Enjoy!


Condividi

Commentami!