Reactive programming in Java con RxJava
RxJava è una implementazione per Java di ReactiveX, un API per la programmazione asincrona.
In questo articolo vediamo un esempio del suo utilizzo.
Invieremo delle richieste tramite WebSocket; purtroppo non posso mettervi l'url che ho usato io perchè non è pubblico.
Detto ciò, per installare la libreria se usate Maven:
<dependency>
<groupId>io.reactivex.rxjava2</groupId>
<artifactId>rxjava</artifactId>
<version>2.2.21</version>
</dependency>
<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.9.1</version>
</dependency>
Abbiamo installato anche una libreria per l'invio di richieste tramite HTTP; ovviamente potete usare quella che volete.
Questo il codice di esempio:
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.disposables.Disposable;
import io.reactivex.disposables.Disposables;
import io.reactivex.schedulers.Schedulers;
import okhttp3.*;
import okio.ByteString;
public class Main {
public static void main(String[] args) {
String url = "";
OkHttpClient client = new OkHttpClient();
Request request = new Request.Builder().url(url).build();
Observable<String> observable = Observable.create(emitter -> {
WebSocket webSocket = client.newWebSocket(request, new WebSocketListener() {
@Override
public void onOpen(WebSocket webSocket, okhttp3.Response response) {
System.out.println("CONNESSIONE APERTA");
}
@Override
public void onMessage(WebSocket webSocket, String text) {
emitter.onNext(text);
}
@Override
public void onMessage(WebSocket webSocket, ByteString bytes) {
}
@Override
public void onClosing(WebSocket webSocket, int code, String reason) {
webSocket.close(code, reason);
}
@Override
public void onClosed(WebSocket webSocket, int code, String reason) {
emitter.onComplete();
}
@Override
public void onFailure(WebSocket webSocket, Throwable t, okhttp3.Response response) {
emitter.onError(t);
}
});
emitter.setDisposable(Disposables.fromRunnable(() -> webSocket.close(1, "CHIUSURA WEBSOCKET")));
});
observable
.subscribeOn(Schedulers.io())
.subscribe(new Observer<>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("SOTTOSCRIZIONE");
}
@Override
public void onNext(String event) {
System.out.println("EVENTO: " + event);
}
@Override
public void onError(Throwable e) {
System.out.println("ERRORE: " + e.getMessage());
}
@Override
public void onComplete() {
System.out.println("FINITO");
}
});
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
}
}
Enjoy!
java maven reactivex rxjava
Commentami!