Skip to content

Commit

Permalink
add server side event support to UI
Browse files Browse the repository at this point in the history
  • Loading branch information
toddtreece committed Jan 13, 2020
1 parent e38f309 commit 19a2519
Show file tree
Hide file tree
Showing 6 changed files with 85 additions and 9 deletions.
6 changes: 5 additions & 1 deletion ui/src/pages/Data.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import moment from 'moment';

import { ApplicationState } from '../store';
import { Data } from '../client';
import { fetchRequest } from '../store/data/actions';
import { fetchRequest, startSubscription, endSubscription } from '../store/data/actions';

interface RouteInfo {
key: string;
Expand All @@ -27,6 +27,10 @@ export const DataPage: React.FC<MainProps> = props => {

useEffect(() => {
dispatch(fetchRequest(params.key));
dispatch(startSubscription(params.key));
return () => {
dispatch(endSubscription(params.key));
};
}, [dispatch]);

// TODO tjt: move data formatting to store
Expand Down
7 changes: 5 additions & 2 deletions ui/src/store/data/actions.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
import { action } from 'typesafe-actions';
import { DataActionTypes } from './types';
import { Stream } from '../../client';
import { Data } from '../../client';

export const fetchRequest = (key: string) => action(DataActionTypes.FETCH_REQUEST, key);
export const fetchSuccess = (data: Stream[]) => action(DataActionTypes.FETCH_SUCCESS, data);
export const fetchSuccess = (data: Data[]) => action(DataActionTypes.FETCH_SUCCESS, data);
export const fetchError = (message: string) => action(DataActionTypes.FETCH_ERROR, message);
export const startSubscription = (key: string) => action(DataActionTypes.START_SUBSCRIPTION, key);
export const endSubscription = (key: string) => action(DataActionTypes.END_SUBSCRIPTION, key);
export const newMessage = (data: Data) => action(DataActionTypes.NEW_MESSAGE, data);
11 changes: 10 additions & 1 deletion ui/src/store/data/reducer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,23 @@ export const initialState: DataState = {
const reducer: Reducer<DataState> = (state = initialState, action) => {
switch (action.type) {
case DataActionTypes.FETCH_REQUEST: {
return { ...state, loading: true, key: action.key };
return { ...state, loading: true, key: action.payload };
}
case DataActionTypes.FETCH_SUCCESS: {
return { ...state, loading: false, data: action.payload };
}
case DataActionTypes.FETCH_ERROR: {
return { ...state, loading: false, errors: action.payload };
}
case DataActionTypes.NEW_MESSAGE: {
return { ...state, data: [action.payload, ...state.data] };
}
case DataActionTypes.START_SUBSCRIPTION: {
return state;
}
case DataActionTypes.END_SUBSCRIPTION: {
return state;
}
default: {
return state;
}
Expand Down
58 changes: 54 additions & 4 deletions ui/src/store/data/sagas.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
import { all, call, fork, put, takeLatest } from 'redux-saga/effects';
import { all, call, fork, put, takeEvery, take, cancel } from 'redux-saga/effects';
import { eventChannel, EventChannel } from 'redux-saga';

import { DataActionTypes } from './types';
import { fetchError, fetchSuccess, fetchRequest } from './actions';
import { fetchError, fetchSuccess, fetchRequest, newMessage } from './actions';
import { DataApi, Configuration } from '../../client';
import { debug } from '../../utils/logger';

const api = new DataApi(new Configuration({ basePath: '/api/v1' }));

Expand All @@ -19,12 +21,60 @@ function* handleFetch({ payload }: ReturnType<typeof fetchRequest>) {
}
}

function createSubscription(key: string) {
return new EventSource(`/api/v1/streams/${key}/subscribe`);
}

function createSubscriptionChannel(subscription: EventSource) {
return eventChannel(emit => {
subscription.onmessage = event => emit(event.data);
return () => {
return subscription.close();
};
});
}

function* handleMessages(channel: EventChannel<unknown>, key: string) {
try {
while (true) {
const message = yield take(channel);
try {
const data = {
payload: JSON.parse(message),
created: new Date().toISOString(),
updated: new Date().toISOString()
};
debug(`/streams/${key}/subscription ->`, message);
yield put(newMessage(data));
} catch (err) {
debug(`/streams/${key}/subscription ->`, message);
}
}
} finally {
debug(`/streams/${key}/subscription -> closing`);
channel.close();
}
}

function* watchMessages() {
while (true) {
const { payload: key } = yield take(DataActionTypes.START_SUBSCRIPTION);
const subscription = yield call(createSubscription, key);

const channel = yield call(createSubscriptionChannel, subscription);
const handleMessagesTask = yield fork(handleMessages, channel, key);

yield take(DataActionTypes.END_SUBSCRIPTION);
yield cancel(handleMessagesTask);
}
}

function* watchFetchRequest() {
yield takeLatest(DataActionTypes.FETCH_REQUEST, handleFetch);
yield takeEvery(DataActionTypes.FETCH_REQUEST, handleFetch);
}

function* dataSaga() {
yield all([fork(watchFetchRequest)]);
yield all([fork(watchFetchRequest), fork(watchMessages)]);
}

export default dataSaga;
5 changes: 4 additions & 1 deletion ui/src/store/data/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ import { Data } from '../../client';
export enum DataActionTypes {
FETCH_REQUEST = '@@data/FETCH_REQUEST',
FETCH_SUCCESS = '@@data/FETCH_SUCCESS',
FETCH_ERROR = '@@data/FETCH_ERROR'
FETCH_ERROR = '@@data/FETCH_ERROR',
NEW_MESSAGE = '@@data/NEW_MESSAGE',
START_SUBSCRIPTION = '@@data/START_SUBSCRIPTION',
END_SUBSCRIPTION = '@@data/END_SUBSCRIPTION'
}

export interface DataState {
Expand Down
7 changes: 7 additions & 0 deletions ui/src/utils/logger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export function log(...args: any[]) {
console.log('[dlphn]', ...args);
}

export function debug(...args: any[]) {
console.debug('[dlphn]', ...args);
}

0 comments on commit 19a2519

Please sign in to comment.