Skip to content

feat: multicast playground #111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -26,6 +26,8 @@ I've also been giving talks about Learning Rx using many of the examples listed
14. [Pagination with Rx (using Subjects)](#14-pagination-with-rx-using-subjects)
15. [Orchestrating Observable: make parallel network calls, then combine the result into a single data point (using flatmap & zip)](#15-orchestrating-observable-make-parallel-network-calls-then-combine-the-result-into-a-single-data-point-using-flatmap--zip)
16. [Simple Timeout example (using timeout)](#16-simple-timeout-example-using-timeout)
17. [Setup and teardown resources (using `using`)](#17-setup-and-teardown-resources-using-using)
18. [Multicast playground](#18-multicast-playground)

## Description

@@ -161,7 +163,7 @@ Cases demonstrated here:
4. run a task constantly every 3s, but after running it 5 times, terminate automatically
5. run a task A, pause for sometime, then execute Task B, then terminate

### 11. RxBus : event bus using RxJava (using RxRelay (never terminating Subjects) and debouncedBuffer)
### 11. RxBus : event bus using RxJava (using RxRelay (never terminating Subjects) and debouncedBuffer)

There are accompanying blog posts that do a much better job of explaining the details on this demo:

@@ -222,6 +224,20 @@ This is a simple example demonstrating the use of the `.timeout` operator. Butto

Notice how we can provide a custom Observable that indicates how to react under a timeout Exception.

### 17. Setup and teardown resources (using `using`)

The [operator `using`](http://reactivex.io/documentation/operators/using.html) is relatively less known and notoriously hard to Google. It's a beautiful API that helps to setup a (costly) resource, use it and then dispose off in a clean way.

The nice thing about this operator is that it provides a mechansim to use potentially costly resources in a tightly scoped manner. using -> setup, use and dispose. Think DB connections (like Realm instances), socket connections, thread locks etc.

### 18. Multicast Playground

Multicasting in Rx is like a dark art. Not too many folks know how to pull it off without concern. This example condiers two subscribers (in the forms of buttons) and allows you to add/remove subscribers at different points of time and see how the different operators behave under those circumstances.

The source observale is a timer (`interval`) observable and the reason this was chosen was to intentionally pick a non-terminating observable, so you can test/confirm if your multicast experiment will leak.

_I also gave a talk about [Multicasting in detail at 360|Andev](https://speakerdeck.com/kaushikgopal/rx-by-example-volume-3-the-multicast-edition). If you have the inclination and time, I highly suggest watching that talk first (specifically the Multicast operator permutation segment) and then messing around with the example here._

## Rx 2.x

All the examples here have been migrated to use RxJava 2.X.
6 changes: 5 additions & 1 deletion app/build.gradle
Original file line number Diff line number Diff line change
@@ -18,13 +18,14 @@ apply plugin: 'com.f2prateek.javafmt'
apply plugin: 'kotlin-android'

dependencies {
compile 'com.android.support:multidex:1.0.1'
compile "com.android.support:support-v13:${supportLibVersion}"
compile "com.android.support:appcompat-v7:${supportLibVersion}"
compile "com.android.support:recyclerview-v7:${supportLibVersion}"

compile 'com.github.kaushikgopal:CoreTextUtils:c703fa12b6'
compile "com.jakewharton:butterknife:${butterKnifeVersion}"
annotationProcessor "com.jakewharton:butterknife-compiler:${butterKnifeVersion}"
kapt "com.jakewharton:butterknife-compiler:${butterKnifeVersion}"
compile 'com.jakewharton.timber:timber:4.5.1'
compile "com.squareup.retrofit2:retrofit:${retrofitVersion}"
compile "com.squareup.retrofit2:converter-gson:${retrofitVersion}"
@@ -44,8 +45,10 @@ dependencies {
// explicitly depend on RxJava's latest version for bug fixes and new features.
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'

compile 'com.jakewharton.rx:replaying-share-kotlin:2.0.0'
compile "com.github.akarnokd:rxjava2-extensions:0.16.0"
compile 'com.jakewharton.rxrelay2:rxrelay:2.0.0'

compile 'com.jakewharton.rxbinding2:rxbinding:2.0.0'
compile 'com.jakewharton.retrofit:retrofit2-rxjava2-adapter:1.0.0'

@@ -65,6 +68,7 @@ android {
targetSdkVersion sdkVersion
versionCode 2
versionName "1.2"
multiDexEnabled true
}
buildTypes {
release {
4 changes: 2 additions & 2 deletions app/src/main/java/com/morihacky/android/rxjava/MyApp.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package com.morihacky.android.rxjava;

import android.app.Application;
import android.support.multidex.MultiDexApplication;
import com.morihacky.android.rxjava.volley.MyVolley;
import com.squareup.leakcanary.LeakCanary;
import com.squareup.leakcanary.RefWatcher;
import timber.log.Timber;

public class MyApp extends Application {
public class MyApp extends MultiDexApplication {

private static MyApp _instance;
private RefWatcher _refWatcher;
Original file line number Diff line number Diff line change
@@ -116,6 +116,16 @@ void demoNetworkDetector() {
clickedOn(new NetworkDetectorFragment());
}

@OnClick(R.id.btn_demo_using)
void demoUsing() {
clickedOn(new UsingFragment());
}

@OnClick(R.id.btn_demo_multicastPlayground)
void demoMulticastPlayground() {
clickedOn(new MulticastPlaygroundFragment());
}

private void clickedOn(@NonNull Fragment fragment) {
final String tag = fragment.getClass().toString();
getActivity()
11 changes: 11 additions & 0 deletions app/src/main/kotlin/com/morihacky/android/rxjava/ext/RxExt.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.morihacky.android.rxjava.ext

import io.reactivex.disposables.CompositeDisposable
import io.reactivex.disposables.Disposable

operator fun CompositeDisposable.plus(disposable: Disposable): CompositeDisposable {
add(disposable)
return this
}


Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package com.morihacky.android.rxjava.fragments

import android.content.Context
import android.os.Bundle
import android.os.Handler
import android.os.Looper
import android.view.LayoutInflater
import android.view.View
import android.view.ViewGroup
import android.widget.*
import butterknife.BindView
import butterknife.ButterKnife
import butterknife.OnClick
import com.jakewharton.rx.replayingShare
import com.morihacky.android.rxjava.R
import io.reactivex.Observable
import io.reactivex.disposables.Disposable
import java.util.concurrent.TimeUnit

class MulticastPlaygroundFragment : BaseFragment() {

@BindView(R.id.list_threading_log) lateinit var logList: ListView
@BindView(R.id.dropdown) lateinit var pickOperatorDD: Spinner
@BindView(R.id.msg_text) lateinit var messageText: TextView

private lateinit var sharedObservable: Observable<Long>
private lateinit var adapter: LogAdapter

private var logs: MutableList<String> = ArrayList()
private var disposable1: Disposable? = null
private var disposable2: Disposable? = null

override fun onCreateView(inflater: LayoutInflater?,
container: ViewGroup?,
savedInstanceState: Bundle?): View? {
val layout = inflater!!.inflate(R.layout.fragment_multicast_playground, container, false)
ButterKnife.bind(this, layout)

_setupLogger()
_setupDropdown()

return layout
}

@OnClick(R.id.btn_1)
fun onBtn1Click() {

disposable1?.let {
it.dispose()
_log("subscriber 1 disposed")
disposable1 = null
return
}

disposable1 =
sharedObservable
.doOnSubscribe { _log("subscriber 1 (subscribed)") }
.subscribe({ long -> _log("subscriber 1: onNext $long") })

}

@OnClick(R.id.btn_2)
fun onBtn2Click() {
disposable2?.let {
it.dispose()
_log("subscriber 2 disposed")
disposable2 = null
return
}

disposable2 =
sharedObservable
.doOnSubscribe { _log("subscriber 2 (subscribed)") }
.subscribe({ long -> _log("subscriber 2: onNext $long") })
}

@OnClick(R.id.btn_3)
fun onBtn3Click() {
logs = ArrayList<String>()
adapter.clear()
}

// -----------------------------------------------------------------------------------
// Method that help wiring up the example (irrelevant to RxJava)

private fun _log(logMsg: String) {

if (_isCurrentlyOnMainThread()) {
logs.add(0, logMsg + " (main thread) ")
adapter.clear()
adapter.addAll(logs)
} else {
logs.add(0, logMsg + " (NOT main thread) ")

// You can only do below stuff on main thread.
Handler(Looper.getMainLooper()).post {
adapter.clear()
adapter.addAll(logs)
}
}
}

private fun _setupLogger() {
logs = ArrayList<String>()
adapter = LogAdapter(activity, ArrayList<String>())
logList.adapter = adapter
}

private fun _setupDropdown() {
pickOperatorDD.adapter = ArrayAdapter<String>(context,
android.R.layout.simple_spinner_dropdown_item,
arrayOf(".publish().refCount()",
".publish().autoConnect(2)",
".replay(1).autoConnect(2)",
".replay(1).refCount()",
".replayingShare()"))


pickOperatorDD.onItemSelectedListener = object : AdapterView.OnItemSelectedListener {

override fun onItemSelected(p0: AdapterView<*>?, p1: View?, index: Int, p3: Long) {

val sourceObservable = Observable.interval(0L, 3, TimeUnit.SECONDS)
.doOnSubscribe { _log("observer (subscribed)") }
.doOnDispose { _log("observer (disposed)") }
.doOnTerminate { _log("observer (terminated)") }

sharedObservable =
when (index) {
0 -> {
messageText.setText(R.string.msg_demo_multicast_publishRefCount)
sourceObservable.publish().refCount()
}
1 -> {
messageText.setText(R.string.msg_demo_multicast_publishAutoConnect)
sourceObservable.publish().autoConnect(2)
}
2 -> {
messageText.setText(R.string.msg_demo_multicast_replayAutoConnect)
sourceObservable.replay(1).autoConnect(2)
}
3 -> {
messageText.setText(R.string.msg_demo_multicast_replayRefCount)
sourceObservable.replay(1).refCount()
}
4 -> {
messageText.setText(R.string.msg_demo_multicast_replayingShare)
sourceObservable.replayingShare()
}
else -> throw RuntimeException("got to pick an op yo!")
}
}

override fun onNothingSelected(p0: AdapterView<*>?) {}
}
}

private fun _isCurrentlyOnMainThread(): Boolean {
return Looper.myLooper() == Looper.getMainLooper()
}

private inner class LogAdapter(context: Context, logs: List<String>) :
ArrayAdapter<String>(context, R.layout.item_log, R.id.item_log, logs)

}

Original file line number Diff line number Diff line change
@@ -16,25 +16,21 @@ class PlaygroundFragment : BaseFragment() {
private var _logsList: ListView? = null
private var _adapter: LogAdapter? = null

private var _attempt = 0
private var _logs: MutableList<String> = ArrayList()

override fun onCreateView(inflater: LayoutInflater?,
container: ViewGroup?,
savedInstanceState: Bundle?): View? {
return inflater!!.inflate(R.layout.fragment_concurrency_schedulers, container, false)
}

override fun onActivityCreated(savedInstanceState: Bundle?) {
super.onActivityCreated(savedInstanceState)
val view = inflater?.inflate(R.layout.fragment_concurrency_schedulers, container, false)

_logsList = activity.findViewById(R.id.list_threading_log) as ListView
_logsList = view?.findViewById(R.id.list_threading_log) as ListView
_setupLogger()

activity.findViewById(R.id.btn_start_operation).setOnClickListener { _ ->
view.findViewById(R.id.btn_start_operation).setOnClickListener { _ ->
_log("Button clicked")
}

_setupLogger()
return view
}

// -----------------------------------------------------------------------------------
@@ -44,23 +40,23 @@ class PlaygroundFragment : BaseFragment() {

if (_isCurrentlyOnMainThread()) {
_logs.add(0, logMsg + " (main thread) ")
_adapter!!.clear()
_adapter!!.addAll(_logs)
_adapter?.clear()
_adapter?.addAll(_logs)
} else {
_logs.add(0, logMsg + " (NOT main thread) ")

// You can only do below stuff on main thread.
Handler(Looper.getMainLooper()).post {
_adapter!!.clear()
_adapter!!.addAll(_logs)
_adapter?.clear()
_adapter?.addAll(_logs)
}
}
}

private fun _setupLogger() {
_logs = ArrayList<String>()
_adapter = LogAdapter(activity, ArrayList<String>())
_logsList!!.adapter = _adapter
_logsList?.adapter = _adapter
}

private fun _isCurrentlyOnMainThread(): Boolean {
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package com.morihacky.android.rxjava.fragments

import android.content.Context
import android.os.Bundle
import android.os.Handler
import android.os.Looper
import android.view.LayoutInflater
import android.view.View
import android.view.ViewGroup
import android.widget.ArrayAdapter
import android.widget.ListView
import android.widget.TextView
import com.morihacky.android.rxjava.R
import io.reactivex.Flowable
import io.reactivex.functions.Consumer
import io.reactivex.functions.Function
import org.reactivestreams.Publisher
import timber.log.Timber
import java.util.*
import java.util.concurrent.Callable

class UsingFragment : BaseFragment() {

private lateinit var _logs: MutableList<String>
private lateinit var _logsList: ListView
private lateinit var _adapter: UsingFragment.LogAdapter

override fun onCreateView(inflater: LayoutInflater?, container: ViewGroup?, savedInstanceState: Bundle?): View? {
val view = inflater?.inflate(R.layout.fragment_buffer, container, false)
_logsList = view?.findViewById(R.id.list_threading_log) as ListView

(view.findViewById(R.id.text_description) as TextView).setText(R.string.msg_demo_using)

_setupLogger()
view.findViewById(R.id.btn_start_operation).setOnClickListener { executeUsingOperation() }
return view
}

private fun executeUsingOperation() {
val resourceSupplier = Callable<Realm> { Realm() }
val sourceSupplier = Function<Realm, Publisher<Int>> { realm ->
Flowable.just(true)
.map {
realm.doSomething()
// i would use the copyFromRealm and change it to a POJO
Random().nextInt(50)
}
}
val disposer = Consumer<Realm> { realm ->
realm.clear()
}

Flowable.using(resourceSupplier, sourceSupplier, disposer)
.subscribe({ i ->
_log("got a value $i - (look at the logs)")
})
}

inner class Realm {
init {
Timber.d("--- initializing Realm instance")
}

fun doSomething() {
Timber.d("--- do something with Realm instance")
}

fun clear() {
// notice how this is called even before you manually "dispose"
Timber.d("--- cleaning up the resources (happens before a manual 'dispose'")
}
}

// -----------------------------------------------------------------------------------
// Method that help wiring up the example (irrelevant to RxJava)

private fun _log(logMsg: String) {
_logs.add(0, logMsg)

// You can only do below stuff on main thread.
Handler(Looper.getMainLooper()).post {
_adapter.clear()
_adapter.addAll(_logs)
}
}

private fun _setupLogger() {
_logs = ArrayList<String>()
_adapter = LogAdapter(activity, ArrayList<String>())
_logsList.adapter = _adapter
}

private class LogAdapter(context: Context, logs: List<String>) : ArrayAdapter<String>(context, R.layout.item_log, R.id.item_log, logs)
}
3 changes: 2 additions & 1 deletion app/src/main/res/layout/fragment_buffer.xml
Original file line number Diff line number Diff line change
@@ -7,7 +7,8 @@
>

<TextView
android:layout_height="wrap_content"
android:id="@+id/text_description"
android:layout_height="wrap_content"
android:layout_width="match_parent"
android:padding="10dp"
android:gravity="center"
14 changes: 14 additions & 0 deletions app/src/main/res/layout/fragment_main.xml
Original file line number Diff line number Diff line change
@@ -127,5 +127,19 @@
android:layout_width="match_parent"
android:text="@string/btn_demo_networkDetector"
/>

<Button
android:id="@+id/btn_demo_using"
android:layout_height="wrap_content"
android:layout_width="match_parent"
android:text="@string/btn_demo_using"
/>

<Button
android:id="@+id/btn_demo_multicastPlayground"
android:layout_height="wrap_content"
android:layout_width="match_parent"
android:text="@string/btn_demo_multicastPlayground"
/>
</LinearLayout>
</ScrollView>
72 changes: 72 additions & 0 deletions app/src/main/res/layout/fragment_multicast_playground.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
<?xml version="1.0" encoding="utf-8"?>
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
android:layout_width="match_parent"
android:layout_height="match_parent"
android:orientation="vertical">


<TableLayout
android:layout_width="match_parent"
android:layout_height="wrap_content">

<TableRow
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:gravity="center">

<Spinner
android:id="@+id/dropdown"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:layout_span="3"/>

</TableRow>

<TableRow
android:layout_width="match_parent"
android:layout_height="wrap_content">

<TextView
android:id="@+id/msg_text"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:layout_weight="1"
android:layout_span="3"
android:padding="@dimen/text_micro"
android:text="@string/msg_demo_multicast_publishRefCount"/>
</TableRow>

<TableRow
android:layout_width="match_parent"
android:layout_height="wrap_content">

<Button
android:id="@+id/btn_1"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_weight="1"
android:text="Subscribe 1"/>

<Button
android:id="@+id/btn_2"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_weight="1"
android:text="Subscribe 2"/>

<Button
android:id="@+id/btn_3"
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:layout_weight="1"
android:text="Clear log"/>
</TableRow>

</TableLayout>


<ListView
android:id="@+id/list_threading_log"
android:layout_width="match_parent"
android:layout_height="match_parent"/>
</LinearLayout>
8 changes: 8 additions & 0 deletions app/src/main/res/values/strings.xml
Original file line number Diff line number Diff line change
@@ -23,6 +23,8 @@
<string name="btn_demo_pagination">Paging example</string>
<string name="btn_demo_pagination_more">MOAR</string>
<string name="btn_demo_networkDetector">Network Detector (Subject)</string>
<string name="btn_demo_using">Setup &amp; teardown resources (using)</string>
<string name="btn_demo_multicastPlayground">MultiConnect operator playground</string>

<string name="msg_demo_pagination">This is a demo of how you can do a list pagination with Rx. We page 10 items at a time and there are 55 items altogether</string>
<string name="msg_demo_volley">This is a Volley request demo</string>
@@ -40,6 +42,12 @@
<string name="msg_demo_timing">BTN 1: run single task once (after 2s complete)\nBTN 2: run task every 1s (start delay of 1s) toggle \nBTN 3: run task every 1s (start immediately) toggle \nBTN 4: run task 5 times every 3s (then complete) \nBTN 5: run task A, pause for sometime, then proceed with Task B</string>
<string name="msg_demo_rotation_persist">This is an example of starting an Observable and using the result across rotations. There are many ways to do this, we use a retained fragment in this example</string>
<string name="msg_demo_network_detector">This is a demo of how to use Subjects to detect Network connectivity\nToggle your Wifi/Network on or off and notice the logs</string>
<string name="msg_demo_using">This is a demo of the somewhat unknown operator "using".\n\nmsg_demo_usingYou typically use it for managing setup/teardown of resources. Classic cases are DB connections (like Realm), sockets, locks etc.\n\nTap the button and look at the logcat. Particularly notice how the Realm instance is self-contained. That is, it is auto-disposed right after use.</string>
<string name="msg_demo_multicast_publishRefCount">RefCount starts the upstream right away and gets disposed off, when all subscribers stop. Hit S1, Hit S2, Hit S1, Hit S2. Hit S1/S2 now and notice the stream starts all over.</string>
<string name="msg_demo_multicast_publishAutoConnect">AutoConnect(2) waits for a min. subscriber count, before starting the upstream. Hit S1 (notice events don\'t start), Hit S2 (notice events now start), Hit S1 (notice that unsubscribing doesn\'t affect upstream), Hit S2, wait for sometime and hit S1 again (notice source stream doesn\'t restart)</string>
<string name="msg_demo_multicast_replayAutoConnect">Replay caches the last item. Hit S1, Hit S2, event starts, Hit S2, wait a bit, Hit S2 again (notice it starts with the last item that S1 saw - courtesy Replay). Hit S2, Hit S1, wait a bit. Hit S1 again (notice event upstream continues and doesn\'t restart)</string>
<string name="msg_demo_multicast_replayRefCount">Replay caches the last item. Hit S1, wait a bit, then hit S2 (notice S2 starts immediately with last item that S1 saw), Hit S2, Hit S1. Hit S1/S2 again (notice the stream restarts all over. Interestingly cached last item also removed when both subscribers released)</string>
<string name="msg_demo_multicast_replayingShare">Courtesy: new #AndroidDev on the block - JakeWharton. exactly like replay(1).refCount(), but caches the last item even when upstream has been disposed off/released. Hit S1, Hit S2, Hit S1, Hit S2 (notice observable is disposed). Hit S1/S2 again (notice we start with last item emitted)</string>

<string name="msg_pseudoCache_demoInfo_concat">Concat merges the results sequentially. But notice that the latter subscription starts only AFTER the first one completes. Some unnecessary waiting there.</string>
<string name="msg_pseudoCache_demoInfo_concatEager">Concat eager is cooler. Both subscriptions start at the same time (parallely) but the order of emission is respected.</string>
4 changes: 2 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -6,7 +6,7 @@ buildscript {
mavenCentral()
}
dependencies {
classpath 'com.android.tools.build:gradle:2.3.2'
classpath 'com.android.tools.build:gradle:2.3.3'
classpath 'com.f2prateek.javafmt:javafmt:0.1.2'

// NOTE: Do not place your application dependencies here; they belong
@@ -26,7 +26,7 @@ allprojects {
sdkVersion = 24
buildToolsVrs = "25.0.0"

kotlinVersion = "1.1.2-4"
kotlinVersion = "1.1.3-2"

butterKnifeVersion = '8.5.1'
mockitoKotlinVersion = "1.4.0"
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-3.5-all.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-4.0-all.zip