
Only a few years ago a large application had tens of servers, seconds of response time, hours of offline maintenance and gigabytes of data. Today applications are deployed on everything from mobile devices to cloud-based clusters running thousands of multi-core processors. Users expect millisecond response times and 100% uptime. Data is measured in Petabytes. Today's demands are simply not met by yesterday’s software architectures.
Responsive | React to demand |
Resilient | React to failure |
Elastic | React to load |
Message Driven | React to events |
Language bindings exist for...
Single Value | Multiple Values | |
---|---|---|
Synchronous | T | Iterable[T] |
Asynchronous | Future[T] | ? |
for (String item : items) {
// do stuff
}
Generates data to be observed
Remember those three properties of iteration?
Observable.create(subscriber -> {
try {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onComplete();
} catch (Exception e) {
subscriber.onError(e);
}
});
Observable.just(1,2,3).
map(i -> i + 1).
subscribe(System.out::println);
// 2
// 3
// 4
Observable.just(2, 1, 0, -1).
map(i -> 4 / i).
subscribe(
item -> System.out.println("Received: " + item),
err -> System.err.println("Error => " + err.getMessage()));
// Received: 2
// Received: 4
// Error => / by zero
Observable.just(3,2,1).
map(i -> i * i).
subscribe(
item -> System.out.println("Received: " + item),
err -> System.err.println("Error => " + err.getMessage()),
() -> System.err.println("Completed"));
// Received: 9
// Received: 4
// Received: 1
// Completed
Single Value | Multiple Values | |
---|---|---|
Synchronous | T | Iterable[T] |
Asynchronous | Future[T] | Observable[T] |
Makes threading much easier, but be careful!
Observable<Long> longs = Observable.just(1,2,3);
longs.subscribe(System.out::println);
// 1
// 2
// 3
longs.subscribe(System.out::println);
// 1
// 2
// 3
This is all asynchronous -- what happens if producers and consumers do not process elements at the same rate?
Observable.create(observer -> {
observer.setProducer(requested -> {
System.out.println("Requested " + requested + " more elements");
for (int i = 0; i < requested && !subscriber.isUnsubscribed(); i++) {
// Emit next items...
}
}
});
Now we get the best of both worlds
Database db = new Database(connectionProvider);
Observable<Integer> score = db
.select("select score from person where name <> ? order by name")
.parameter("XAVIER")
.getAs(Integer.class)
.last();
Observable.from("one", "two", "three", "four", "five")
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(/* an Observer */);
List<Integer> ints = Observable.just(1,2,3).
toList().toBlocking().single();
Allows assertions to be executed against subscribers in unit tests
Allows testing events by manipulating time