Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connections not released after each requests ? #50

Open
Crystark opened this issue Dec 11, 2015 · 5 comments
Open

Connections not released after each requests ? #50

Crystark opened this issue Dec 11, 2015 · 5 comments
Labels

Comments

@Crystark
Copy link

Hi,

I'm having some problem on this observable. I'm using rxjava-jdbc 0.6.8 with a min size 1 and a max size 2 for the Hikari pool.

updateObservable = Observable.interval(2, 2, TimeUnit.SECONDS)
    .onBackpressureLatest()
    .flatMap(i -> db
        .select("SELECT * FROM elements")
        .autoMap(Element.class)
    )
    .compose(service.asTransformer())
    .doOnNext(t -> System.out.println("updated " + t))
    .map(e -> Observable.<Object> just(e.part1, e.part2, new Timestamp(e.updated_at), e.id))
    .lift(db
        .update("UPDATE elements SET part1 = ?, part2 = ?, updated_at = ? WHERE id = ?")
        .parameterListOperator()
    )
    .compose(Observable::merge)
    .doOnNext(t -> System.out.println("done " + t))
    .retry((i, t) -> {
        if (i < RETRIES) {
            L.warn("Something went wrong. Retry (" + i + "/" + RETRIES + ")...", t);
            return true;
        }
        else {
            L.error("Something is still wrong after " + RETRIES + " retries. Fix it. " + NAME + " stopped.", t);
            return false;
        }
    })
    .onErrorResumeNext(t -> Observable.empty());

Here are the logs this shows

updated Element [id=169900]
done 1
updated Element [id=169901]
2015-12-11 11:38:59 WARN  ElementUpdater:66 - Something went wrong. Retry (1/100)...
com.github.davidmoten.rx.jdbc.exceptions.SQLRuntimeException: java.sql.SQLTransientConnectionException: HikariPool-0 - Connection is not available, request timed out after 30000ms.
        at com.github.davidmoten.rx.jdbc.ConnectionProviderPooled.get(ConnectionProviderPooled.java:92)
        at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.getConnection(QueryUpdateOnSubscribe.java:126)
        at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.call(QueryUpdateOnSubscribe.java:79)
        at com.github.davidmoten.rx.jdbc.QueryUpdateOnSubscribe.call(QueryUpdateOnSubscribe.java:21)
        at rx.Observable.unsafeSubscribe(Observable.java:8171)
        at rx.internal.operators.OperatorSubscribeOn$1$1.call(OperatorSubscribeOn.java:62)
        at rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.enqueue(TrampolineScheduler.java:77)
        at rx.schedulers.TrampolineScheduler$InnerCurrentThreadScheduler.schedule(TrampolineScheduler.java:56)
        at rx.internal.operators.OperatorSubscribeOn$1.onNext(OperatorSubscribeOn.java:57)
        at rx.internal.operators.OperatorSubscribeOn$1.onNext(OperatorSubscribeOn.java:43)
        at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:46)
        at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:35)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable.unsafeSubscribe(Observable.java:8171)
        at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:172)
        at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:136)
        at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:79)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
        at rx.internal.operators.OperatorBufferWithSize$1.onNext(OperatorBufferWithSize.java:104)
        at rx.observers.SerializedObserver.onNext(SerializedObserver.java:95)
        at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:95)
        at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.onNext(OperatorConcat.java:199)
        at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.slowpath(OnSubscribeFromIterable.java:97)
        at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:73)
        at rx.internal.producers.ProducerArbiter.setProducer(ProducerArbiter.java:126)
        at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.setProducer(OperatorConcat.java:222)
        at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:49)
        at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:32)
        at rx.Observable.unsafeSubscribe(Observable.java:8171)
        at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:172)
        at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:136)
        at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:79)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
        at rx.observers.SerializedObserver.onNext(SerializedObserver.java:95)
        at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:95)
        at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.onNext(OperatorConcat.java:199)
        at rx.observers.SerializedObserver.onNext(SerializedObserver.java:95)
        at rx.observers.SerializedSubscriber.onNext(SerializedSubscriber.java:95)
        at rx.internal.operators.OperatorConcat$ConcatInnerSubscriber.onNext(OperatorConcat.java:199)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
        at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:46)
        at rx.internal.util.ScalarSynchronousObservable$1.call(ScalarSynchronousObservable.java:35)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable.unsafeSubscribe(Observable.java:8171)
        at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:172)
        at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:136)
        at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:79)
        at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.slowpath(OnSubscribeFromIterable.java:97)
        at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:73)
        at rx.Subscriber.setProducer(Subscriber.java:211)
        at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:49)
        at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:32)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable.unsafeSubscribe(Observable.java:8171)
        at rx.internal.operators.OperatorConcat$ConcatSubscriber.subscribeNext(OperatorConcat.java:172)
        at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:136)
        at rx.internal.operators.OperatorConcat$ConcatSubscriber.onNext(OperatorConcat.java:79)
        at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.slowpath(OnSubscribeFromIterable.java:97)
        at rx.internal.operators.OnSubscribeFromIterable$IterableProducer.request(OnSubscribeFromIterable.java:73)
        at rx.Subscriber.setProducer(Subscriber.java:211)
        at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:49)
        at rx.internal.operators.OnSubscribeFromIterable.call(OnSubscribeFromIterable.java:32)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable$2.call(Observable.java:162)
        at rx.Observable$2.call(Observable.java:154)
        at rx.Observable.unsafeSubscribe(Observable.java:8171)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:232)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.onNext(OperatorMerge.java:142)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
        at rx.internal.operators.OperatorDoOnEach$1.onNext(OperatorDoOnEach.java:85)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:365)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:327)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:804)
        at rx.internal.operators.OperatorFilter$1.onNext(OperatorFilter.java:54)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
        at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$1.onNext(OperatorOnErrorResumeNextViaFunction.java:111)
        at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:54)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.emitScalar(OperatorMerge.java:365)
        at rx.internal.operators.OperatorMerge$MergeSubscriber.tryEmit(OperatorMerge.java:327)
        at rx.internal.operators.OperatorMerge$InnerSubscriber.onNext(OperatorMerge.java:804)
        at io.reactivex.netty.protocol.http.UnicastContentSubject$AutoReleaseByteBufOperator$1.onNext(UnicastContentSubject.java:262)
        at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
        at rx.internal.operators.BufferUntilSubscriber.emit(BufferUntilSubscriber.java:145)
        at rx.internal.operators.BufferUntilSubscriber.onNext(BufferUntilSubscriber.java:178)
        at io.reactivex.netty.protocol.http.UnicastContentSubject.onNext(UnicastContentSubject.java:286)
        at io.reactivex.netty.protocol.http.client.ClientRequestResponseConverter.invokeContentOnNext(ClientRequestResponseConverter.java:248)
        at io.reactivex.netty.protocol.http.client.ClientRequestResponseConverter.channelRead(ClientRequestResponseConverter.java:141)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:242)
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:147)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at io.netty.handler.logging.LoggingHandler.channelRead(LoggingHandler.java:283)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
        at io.reactivex.netty.metrics.BytesInspector.channelRead(BytesInspector.java:59)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:339)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:324)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:847)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:131)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
        at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
        at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLTransientConnectionException: HikariPool-0 - Connection is not available, request timed out after 30000ms.
        at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:196)
        at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:148)
        at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:99)
        at com.github.davidmoten.rx.jdbc.ConnectionProviderPooled.get(ConnectionProviderPooled.java:90)
        ... 123 more
updated Element [id=169901]
done 1
updated Element [id=169902]
done 1
updated Element [id=169903]
2015-12-11 11:39:35 WARN  ElementUpdater:66 - Something went wrong. Retry (2/100)...
com.github.davidmoten.rx.jdbc.exceptions.SQLRuntimeException: java.sql.SQLTransientConnectionException: HikariPool-0 - Connection is not available, request timed out after 30000ms.
        [...] Same stack trace
updated Element [id=169903]
done 1
...

So each time it does 2 requests, it freezes and times out at 30 seconds (I guess cause all the connections are used). It then retries , succeeds at the id it failed on, succeeds at the next and freezes once again when already 2 requests have been done. And so on.

I must not be understanding something about the pools here. Please let me know if i'm doing something wrong.

@Crystark
Copy link
Author

OK seems the problem is elsewhere. I thought backpressure would trigger once one request is being executed (thus skiping some interval ticks) but it's not. My main requests executes twice simultaneously once 2 seconds have been skipped and locks the pool.

Would there be anyway to enforce no more than one main request at a time ?

@Crystark
Copy link
Author

Seems like using lift instead of flatMap works as I want ti to:

updateObservable = Observable.interval(2, 2, TimeUnit.SECONDS)
    .onBackpressureLatest()
    .map(i -> Observable.empty())
    .lift(db
        .select("SELECT * FROM elements")
        .parameterListOperator()
        .autoMap(Element.class)
    )
    .compose(service.asTransformer())
    .doOnNext(t -> System.out.println("updated " + t))
    .map(e -> Observable.<Object> just(e.part1, e.part2, new Timestamp(e.updated_at), e.id))
    .lift(db
        .update("UPDATE elements SET part1 = ?, part2 = ?, updated_at = ? WHERE id = ?")
        .parameterListOperator()
    )
    .compose(Observable::merge)
    .doOnNext(t -> System.out.println("done " + t))
    .retry((i, t) -> {
        if (i < RETRIES) {
            L.warn("Something went wrong. Retry (" + i + "/" + RETRIES + ")...", t);
            return true;
        }
        else {
            L.error("Something is still wrong after " + RETRIES + " retries. Fix it. " + NAME + " stopped.", t);
            return false;
        }
    })
    .onErrorResumeNext(t -> Observable.empty());

@Crystark
Copy link
Author

If you can spare a minute to explain why lift works and not flat map it would be great :)

@davidmoten
Copy link
Owner

Hi, sorry still a bit short on time. I hope to have a look soon.

@ssumit
Copy link

ssumit commented Feb 1, 2017

are there links to blogs/more documentation on transactions?
m having similar issue,
http://stackoverflow.com/questions/41956648/rxjava-jdbc-rollback-transaction-in-psql

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants