Skip to content
Dave Moten edited this page Nov 13, 2016 · 29 revisions

#rxjava-jdbc rewrite for RxJava 2 These are some of the aims of the rewrite:

  • leverage RxJava 2 features including more explicit types (Completable, Single, Maybe, Flowable, Observable), new creation methods, better performance, improved API.
  • aim support at Flowable because backpressure can be applied in database interactions (for example result set paging)
  • support transactions across asynchronous boundaries
  • support returning generated keys
  • support batching
  • reactive connection pools - connection pools for example c3po or Hikari provide blocking semaphore access to groups of connections. Blocking for access to jdbc Connection objects is not in keeping with reactive style so an rx connection pool is proposed (draft already written, tests required).

Transaction support

rxjava-jdbc supported transactions but relied on synchronous processing and ThreadLocals to hold context. This meant that a bunch of operators could not be applied within the stream for fear of stuffing up access to the ThreadLocal. The rewrite will provide support for using Transactions across asynchronous boundaries at the expense of a slightly more verbose api.

###Example Across asynchronous boundaries we need to pass a Tx object. A select may not return any results but we still may want to have access to the transaction so we can for instance do another call within the same transaction. Tx contains an atomic counter so we know when we can finalize the transaction with commit or rollback.

Tx is like a Notification object:

public interface Tx<T> {
   boolean isValue();
   boolean isComplete();
   boolean isError();
   T value();
   Throwable throwable();
   Connection connection();
   void commit();
   void rollback();
}

Example usage:

    // use a new transaction for the select/update
    db.transacted()
      .select("select name from person where score > 1")
      // don't care about valueless Completed Tx<String> emissions
      .valuesOnly()
      // return the query results as Flowable<Tx<String>>
      .getAs(String.class)
      // for each value
      .flatMap(tx -> 
          db.in(tx)
            .select("select name, score from person where name=:name")
            .param("name", tx.value())
            .getAs(Double.class, String.class)
            // not going to pass the transaction on further
            // so let's make life simple by dealing only with 
            // the values from the select query
            .flatMap(Tx.TO_VALUE)
            .doOnNext(System.out::println)
            // on termination will decrement Tx counter
            // cross an asynchronous boundary at subscribe time!
            .subscribeOn(Schedulers.io())
       )
      // note that on termination will decrement Tx counter
      // and if 0 will commit/rollback as appropriate 
      .subscribe();
Clone this wiki locally