Skip to content

Commit a1be863

Browse files
authored
Better http channel (#7)
* add flag to http endpoint to await request handled * add default reponse code * implement feedback (rename res to resolve) * bump alpha version
1 parent 5a33e0e commit a1be863

File tree

4 files changed

+110
-15
lines changed

4 files changed

+110
-15
lines changed

channels/http.ttl

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,18 @@
2525
sh:maxCount 1;
2626
sh:datatype xsd:boolean;
2727
sh:description "Stream raw bytes if true";
28+
], [
29+
sh:path :waitHandled;
30+
sh:name "waitHandled";
31+
sh:maxCount 1;
32+
sh:datatype xsd:boolean;
33+
sh:description "Wait for handlers to be have handled the incoming message";
34+
], [
35+
sh:path :responseCode;
36+
sh:name "responseCode";
37+
sh:maxCount 1;
38+
sh:datatype xsd:integer;
39+
sh:description "Specify the expected response code (default 200)";
2840
].
2941

3042
[ ] a sh:NodeShape;

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@ajuvercr/js-runner",
3-
"version": "0.1.16",
3+
"version": "0.1.18-alpha.0",
44
"type": "module",
55
"exports": {
66
"import": "./dist/index.js",

src/connectors/http.ts

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,18 @@ import {
1515
WriterConstructor,
1616
} from "../connectors";
1717

18-
function streamToString(stream: Readable, binary: boolean): Promise<string | Buffer> {
18+
function streamToString(
19+
stream: Readable,
20+
binary: boolean,
21+
): Promise<string | Buffer> {
1922
const datas = <Buffer[]>[];
2023
return new Promise((res) => {
2124
stream.on("data", (data) => {
2225
datas.push(data);
2326
});
2427
stream.on("end", () => {
2528
const streamData = Buffer.concat(datas);
26-
res(binary ? streamData : streamData.toString())
29+
res(binary ? streamData : streamData.toString());
2730
});
2831
});
2932
}
@@ -32,6 +35,8 @@ export interface HttpReaderConfig extends Config {
3235
endpoint: string;
3336
port: number;
3437
binary: boolean;
38+
waitHandled?: boolean;
39+
responseCode?: number;
3540
}
3641

3742
export const startHttpStreamReader: ReaderConstructor<HttpReaderConfig> = (
@@ -42,11 +47,12 @@ export const startHttpStreamReader: ReaderConstructor<HttpReaderConfig> = (
4247
const stream = new SimpleStream<string | Buffer>(
4348
() =>
4449
new Promise((res) => {
45-
const cb = (): void => res();
4650
if (server !== undefined) {
47-
server.close(cb);
51+
server.close(() => {
52+
res();
53+
});
4854
} else {
49-
cb();
55+
res();
5056
}
5157
}),
5258
);
@@ -57,20 +63,23 @@ export const startHttpStreamReader: ReaderConstructor<HttpReaderConfig> = (
5763
) {
5864
try {
5965
const content = await streamToString(req, config.binary);
60-
stream.push(content).catch((error) => {
66+
67+
const promise = stream.push(content).catch((error) => {
6168
throw error;
6269
});
70+
if (config.waitHandled) {
71+
await promise;
72+
}
6373
} catch (error: unknown) {
6474
console.error("Failed", error);
6575
}
6676

67-
res.writeHead(200);
77+
res.writeHead(config.responseCode || 200);
6878
res.end("OK");
6979
};
7080

7181
server = createServer(requestListener);
7282
const init = () => {
73-
console.log("HTTP init!");
7483
return new Promise<void>((res) => {
7584
const cb = (): void => res(undefined);
7685
if (server) {
@@ -94,23 +103,25 @@ export const startHttpStreamWriter: WriterConstructor<HttpWriterConfig> = (
94103
const requestConfig = <https.RequestOptions>new URL(config.endpoint);
95104

96105
const push = async (item: string | Buffer): Promise<void> => {
97-
await new Promise((res) => {
106+
await new Promise(async (resolve) => {
98107
const options = {
99108
hostname: requestConfig.hostname,
100109
path: requestConfig.path,
101110
method: config.method,
102111
port: requestConfig.port,
103112
};
113+
104114
const cb = (response: IncomingMessage): void => {
105115
response.on("data", () => {});
106116
response.on("end", () => {
107-
res(null);
117+
resolve(null);
108118
});
109119
};
110120

111121
const req = http.request(options, cb);
112-
req.write(item, () => res(null));
113-
req.end();
122+
await new Promise((res) => req.write(item, res));
123+
await new Promise((res) => req.end(res));
124+
// res(null);
114125
});
115126
};
116127

test/connectors/http.test.ts

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ describe("connector-http", () => {
1919
const factory = new conn.ChannelFactory();
2020
const reader = factory.createReader(readerConfig);
2121
const writer = factory.createWriter(writerConfig);
22-
22+
2323
reader.data((data) => {
2424
items.push(data);
2525
});
@@ -43,6 +43,7 @@ describe("connector-http", () => {
4343
endpoint: "localhost",
4444
port: 8081,
4545
binary: true,
46+
waitHandled: false,
4647
ty: conn.Conn.HttpReaderChannel,
4748
};
4849
const writerConfig: HttpWriterConfig = {
@@ -54,8 +55,9 @@ describe("connector-http", () => {
5455
const factory = new conn.ChannelFactory();
5556
const reader = factory.createReader(readerConfig);
5657
const writer = factory.createWriter(writerConfig);
57-
58+
5859
reader.data((data) => {
60+
console.log("This reader works");
5961
expect(Buffer.isBuffer(data)).toBeTruthy();
6062
items.push(data.toString());
6163
});
@@ -73,6 +75,76 @@ describe("connector-http", () => {
7375

7476
await Promise.all([reader.end(), writer.end()]);
7577
});
78+
79+
test("Should write -> HTTP -> read (Buffer) and await response", async () => {
80+
const readerConfig: HttpReaderConfig = {
81+
endpoint: "localhost",
82+
port: 8082,
83+
binary: true,
84+
waitHandled: true,
85+
ty: conn.Conn.HttpReaderChannel,
86+
};
87+
const writerConfig: HttpWriterConfig = {
88+
endpoint: "http://localhost:8082",
89+
method: "POST",
90+
ty: conn.Conn.HttpWriterChannel,
91+
};
92+
93+
const factory = new conn.ChannelFactory();
94+
const reader = factory.createReader(readerConfig);
95+
const writer = factory.createWriter(writerConfig);
96+
97+
reader.data(async (data) => {
98+
expect(Buffer.isBuffer(data)).toBeTruthy();
99+
items.push(data.toString());
100+
await sleep(1500);
101+
});
102+
103+
await factory.init();
104+
105+
const items: unknown[] = [];
106+
107+
const start = new Date().getTime();
108+
await writer.push(Buffer.from("test1", "utf8"));
109+
const end = new Date().getTime();
110+
await sleep(200);
111+
112+
expect(end - start > 1000).toBeTruthy();
113+
expect(items).toEqual(["test1"]);
114+
115+
await Promise.all([reader.end(), writer.end()]);
116+
});
117+
118+
test("http channel uses correct response code", async () => {
119+
const readerConfig: HttpReaderConfig = {
120+
endpoint: "localhost",
121+
port: 8083,
122+
binary: false,
123+
responseCode: 202,
124+
ty: conn.Conn.HttpReaderChannel,
125+
};
126+
127+
const factory = new conn.ChannelFactory();
128+
const reader = factory.createReader(readerConfig);
129+
130+
reader.data((data) => {
131+
items.push(data);
132+
});
133+
134+
await factory.init();
135+
136+
const items: unknown[] = [];
137+
138+
const resp = await fetch("http://localhost:8083", {
139+
body: "test1",
140+
method: "PUT",
141+
});
142+
143+
expect(items).toEqual(["test1"]);
144+
expect(resp.status).toEqual(202);
145+
146+
await Promise.all([reader.end()]);
147+
});
76148
});
77149

78150
function sleep(x: number): Promise<unknown> {

0 commit comments

Comments
 (0)