Skip to content

Commit 7163f35

Browse files
committedOct 31, 2021
initial commit
0 parents  commit 7163f35

File tree

4 files changed

+487
-0
lines changed

4 files changed

+487
-0
lines changed
 

‎Makefile

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
LIB= amqtt
2+
SRCS= amqtt.c
3+
MAN=
4+
5+
WARNINGS= Yes
6+
7+
.include <bsd.lib.mk>

‎amqtt.c

+320
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,320 @@
1+
/* */
2+
3+
/*
4+
* Copyright (c) 2021 David Gwynne <david@gwynne.id.au>
5+
*
6+
* Permission to use, copy, modify, and distribute this software for any
7+
* purpose with or without fee is hereby granted, provided that the above
8+
* copyright notice and this permission notice appear in all copies.
9+
*
10+
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11+
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12+
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13+
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14+
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15+
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16+
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17+
*/
18+
19+
#include <sys/queue.h>
20+
21+
#include <stdlib.h>
22+
#include <string.h>
23+
#include <stdint.h>
24+
25+
#include "mqtt_protocol.h"
26+
#include "amqtt.h"
27+
28+
struct mqtt_message {
29+
uint8_t *mm_buf;
30+
size_t mm_len;
31+
size_t mm_off;
32+
int mm_id;
33+
34+
TAILQ_ENTRY(mqtt_message)
35+
mm_entry;
36+
};
37+
38+
TAILQ_HEAD(mqtt_messages, mqtt_message);
39+
40+
struct mqtt_conn {
41+
void *mc_cookie;
42+
const struct mqtt_settings
43+
*mc_settings;
44+
45+
struct mqtt_messages
46+
mc_messages;
47+
};
48+
49+
static size_t
50+
mqtt_header_set(void *buf, uint8_t type, uint8_t flags, size_t len)
51+
{
52+
struct mqtt_header *hdr = buf;
53+
uint8_t *p = hdr->remlen;
54+
size_t rv = sizeof(hdr->p);
55+
56+
hdr->p = (type << 4) | flags;
57+
58+
do {
59+
uint8_t byte = len & 0x7f;
60+
len >>= 7;
61+
if (len)
62+
byte |= 0x80;
63+
64+
*p++ = byte;
65+
rv++;
66+
} while (len);
67+
68+
return (rv);
69+
}
70+
71+
static void
72+
mqtt_u16(struct mqtt_u16 *mu16, uint16_t u16)
73+
{
74+
mu16->hi = u16 >> 8;
75+
mu16->lo = u16 >> 0;
76+
}
77+
78+
static size_t
79+
mqtt_lenstr(void *buf, uint16_t len, const void *str)
80+
{
81+
struct mqtt_u16 *mu16 = buf;
82+
83+
memcpy(mu16 + 1, str, len);
84+
85+
return (sizeof(*mu16) + len);
86+
}
87+
88+
void *
89+
mqtt_cookie(struct mqtt_conn *mc)
90+
{
91+
return (mc->mc_cookie);
92+
}
93+
94+
struct mqtt_conn *
95+
mqtt_conn_create(const struct mqtt_settings *ms, void *cookie)
96+
{
97+
struct mqtt_conn *mc;
98+
99+
mc = malloc(sizeof(*mc));
100+
if (mc == NULL)
101+
return (NULL);
102+
103+
mc->mc_cookie = cookie;
104+
mc->mc_settings = ms;
105+
TAILQ_INIT(&mc->mc_messages);
106+
107+
return (mc);
108+
}
109+
110+
void
111+
mqtt_conn_destroy(struct mqtt_conn *mc)
112+
{
113+
free(mc);
114+
}
115+
116+
static int
117+
mqtt_enqueue(struct mqtt_conn *mc, int id, void *msg, size_t len)
118+
{
119+
struct mqtt_message *mm;
120+
121+
mm = malloc(sizeof(*mm));
122+
if (mm == NULL)
123+
return (-1);
124+
125+
mm->mm_buf = msg;
126+
mm->mm_len = len;
127+
mm->mm_off = 0;
128+
mm->mm_id = id;
129+
130+
TAILQ_INSERT_TAIL(&mc->mc_messages, mm, mm_entry);
131+
132+
/* push hard */
133+
mqtt_output(mc);
134+
135+
return (0);
136+
}
137+
138+
void
139+
mqtt_input(struct mqtt_conn *mc, const void *buf, size_t len)
140+
{
141+
142+
}
143+
144+
void
145+
mqtt_output(struct mqtt_conn *mc)
146+
{
147+
struct mqtt_message *mm = TAILQ_FIRST(&mc->mc_messages);
148+
ssize_t rv;
149+
150+
do {
151+
rv = (*mc->mc_settings->mqtt_output)(mc,
152+
mm->mm_buf + mm->mm_off, mm->mm_len - mm->mm_off);
153+
if (rv == -1)
154+
return;
155+
156+
mm->mm_off += rv;
157+
if (mm->mm_off <= mm->mm_len) {
158+
(*mc->mc_settings->mqtt_want_output)(mc);
159+
return;
160+
}
161+
162+
TAILQ_REMOVE(&mc->mc_messages, mm, mm_entry);
163+
free(mm->mm_buf);
164+
if (mm->mm_id == -1)
165+
free(mm);
166+
167+
mm = TAILQ_FIRST(&mc->mc_messages);
168+
} while (mm != NULL);
169+
}
170+
171+
int
172+
mqtt_connect(struct mqtt_conn *mc, const struct mqtt_conn_settings *mcs)
173+
{
174+
uint8_t *msg, *buf;
175+
struct mqtt_p_connect *pc;
176+
size_t len = sizeof(*pc);
177+
size_t hlen;
178+
uint8_t flags = 0;
179+
uint16_t keep_alive = 30;
180+
181+
if (mcs->clean_session)
182+
flags |= MQTT_CONNECT_F_CLEAN_SESSION;
183+
if (mcs->keep_alive > 0) {
184+
keep_alive = mcs->keep_alive;
185+
if (keep_alive > 0xffff)
186+
return (-1);
187+
}
188+
189+
if (mcs->clientid_len > MQTT_MAX_LEN)
190+
return (-1);
191+
len += sizeof(struct mqtt_u16) + mcs->clientid_len;
192+
193+
if (mcs->will_topic != NULL) {
194+
if (mcs->will_topic_len > MQTT_MAX_LEN)
195+
return (-1);
196+
len += sizeof(struct mqtt_u16) + mcs->will_topic_len;
197+
198+
if (mcs->will_payload_len > MQTT_MAX_LEN)
199+
return (-1);
200+
len += sizeof(struct mqtt_u16) + mcs->will_payload_len;
201+
202+
flags |= MQTT_CONNECT_F_WILL;
203+
flags |= MQTT_CONNECT_F_WILL_QOS(mcs->will_qos);
204+
if (mcs->will_retain)
205+
flags |= MQTT_CONNECT_F_WILL_RETAIN;
206+
}
207+
208+
if (mcs->username != NULL) {
209+
if (mcs->username_len > 0xfff)
210+
return (-1);
211+
len += sizeof(struct mqtt_u16) + mcs->username_len;
212+
213+
flags |= MQTT_CONNECT_F_USERNAME;
214+
215+
if (mcs->password != NULL) {
216+
if (mcs->password_len > MQTT_MAX_LEN)
217+
return (-1);
218+
len += sizeof(struct mqtt_u16) + mcs->password_len;
219+
}
220+
}
221+
222+
if (len > MQTT_MAX_REMLEN)
223+
return (-1);
224+
225+
msg = malloc(sizeof(struct mqtt_header) + len);
226+
if (msg == NULL)
227+
return (-1);
228+
229+
hlen = mqtt_header_set(msg, MQTT_T_CONNECT, 0, len);
230+
buf = msg + hlen;
231+
232+
pc = (struct mqtt_p_connect *)buf;
233+
mqtt_u16(&pc->len, sizeof(pc->mqtt));
234+
pc->mqtt[0] = 'M';
235+
pc->mqtt[1] = 'Q';
236+
pc->mqtt[2] = 'T';
237+
pc->mqtt[3] = 'T';
238+
pc->level = 0x4;
239+
pc->flags = flags;
240+
mqtt_u16(&pc->keep_alive, keep_alive);
241+
242+
buf += sizeof(*pc);
243+
244+
buf += mqtt_lenstr(buf, mcs->clientid_len, mcs->clientid);
245+
if (mcs->will_topic != NULL) {
246+
buf += mqtt_lenstr(buf,
247+
mcs->will_topic_len, mcs->will_topic);
248+
buf += mqtt_lenstr(buf,
249+
mcs->will_payload_len, mcs->will_payload);
250+
}
251+
if (mcs->username != NULL) {
252+
buf += mqtt_lenstr(buf,
253+
mcs->username_len, mcs->username);
254+
if (mcs->password != NULL) {
255+
buf += mqtt_lenstr(buf,
256+
mcs->password_len, mcs->password);
257+
}
258+
}
259+
260+
/* try to shove the message onto the transport straight away */
261+
if (mqtt_enqueue(mc, -1, msg, hlen + len) == -1) {
262+
free(msg);
263+
return (-1);
264+
}
265+
266+
return (0);
267+
}
268+
269+
void
270+
mqtt_disconnect(struct mqtt_conn *mc)
271+
{
272+
273+
}
274+
275+
int
276+
mqtt_publish(struct mqtt_conn *mc,
277+
const char *topic, size_t topic_len,
278+
const char *payload, size_t payload_len,
279+
enum mqtt_qos qos, unsigned int retain)
280+
{
281+
uint8_t *msg, *buf;
282+
size_t len = 0;
283+
size_t hlen;
284+
uint8_t flags = 0;
285+
286+
if (retain)
287+
flags |= (1 << 0);
288+
flags |= qos << 1;
289+
290+
if (topic_len > MQTT_MAX_LEN)
291+
return (-1);
292+
len += sizeof(struct mqtt_u16) + topic_len;
293+
294+
if (qos != MQTT_QOS0) {
295+
//len += sizeof(struct mqtt_u16);
296+
return (-1); /* XXX */
297+
}
298+
299+
len += payload_len;
300+
if (len > MQTT_MAX_REMLEN)
301+
return (-1);
302+
303+
msg = malloc(sizeof(struct mqtt_header) + len);
304+
if (msg == NULL)
305+
return (-1);
306+
307+
hlen = mqtt_header_set(msg, MQTT_T_PUBLISH, flags, len);
308+
buf = msg + hlen;
309+
310+
buf += mqtt_lenstr(buf, topic_len, topic);
311+
memcpy(buf, payload, payload_len);
312+
313+
/* try to shove the message onto the transport straight away */
314+
if (mqtt_enqueue(mc, -1, msg, hlen + len) == -1) {
315+
free(msg);
316+
return (-1);
317+
}
318+
319+
return (0);
320+
}

‎amqtt.h

+94
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
2+
/*
3+
* Copyright (c) 2021 David Gwynne <david@gwynne.id.au>
4+
*
5+
* Permission to use, copy, modify, and distribute this software for any
6+
* purpose with or without fee is hereby granted, provided that the above
7+
* copyright notice and this permission notice appear in all copies.
8+
*
9+
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
10+
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
11+
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
12+
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
13+
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
14+
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
15+
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
16+
*/
17+
18+
struct mqtt_conn;
19+
struct timeval;
20+
21+
enum mqtt_qos {
22+
MQTT_QOS0,
23+
MQTT_QOS1,
24+
MQTT_QOS2,
25+
};
26+
27+
struct mqtt_settings {
28+
unsigned int mqtt_max_topic;
29+
unsigned int mqtt_max_payload;
30+
31+
void (*mqtt_want_output)(struct mqtt_conn *);
32+
ssize_t (*mqtt_output)(struct mqtt_conn *,
33+
const void *, size_t);
34+
void (*mqtt_want_timeout)(struct mqtt_conn *,
35+
const struct timeval *);
36+
37+
void (*mqtt_on_connect)(struct mqtt_conn *);
38+
void (*mqtt_on_message)(struct mqtt_conn *,
39+
char *, size_t, char *, size_t,
40+
enum mqtt_qos, unsigned int);
41+
void (*mqtt_on_suback)(struct mqtt_conn *, int,
42+
uint8_t *, size_t);
43+
void (*mqtt_on_unsuback)(struct mqtt_conn *, int);
44+
45+
};
46+
47+
struct mqtt_conn_settings {
48+
unsigned int clean_session;
49+
unsigned int keep_alive;
50+
51+
const char *clientid;
52+
size_t clientid_len;
53+
const char *username;
54+
size_t username_len;
55+
const char *password;
56+
size_t password_len;
57+
58+
const char *will_topic;
59+
size_t will_topic_len;
60+
const char *will_payload;
61+
size_t will_payload_len;
62+
enum mqtt_qos will_qos;
63+
unsigned int will_retain;
64+
};
65+
66+
struct mqtt_conn *mqtt_conn_create(const struct mqtt_settings *,
67+
void *);
68+
int mqtt_connect(struct mqtt_conn *,
69+
const struct mqtt_conn_settings *);
70+
void *mqtt_cookie(struct mqtt_conn *);
71+
void mqtt_input(struct mqtt_conn *, const void *, size_t);
72+
void mqtt_output(struct mqtt_conn *);
73+
void mqtt_disconnect(struct mqtt_conn *);
74+
void mqtt_conn_destroy(struct mqtt_conn *);
75+
76+
struct mqtt_topic {
77+
const char *filter;
78+
size_t len;
79+
enum mqtt_qos qos;
80+
};
81+
82+
int mqtt_publish(struct mqtt_conn *,
83+
const char *, size_t, const char *, size_t,
84+
enum mqtt_qos, unsigned int);
85+
86+
int mqtt_subscribe(struct mqtt_conn *,
87+
const char *, size_t, enum mqtt_qos);
88+
int mqtt_subscribev(struct mqtt_conn *,
89+
const struct mqtt_topic *, int);
90+
int mqtt_unsubscribe(struct mqtt_conn *,
91+
const char *, size_t, enum mqtt_qos);
92+
int mqtt_unsubscribev(struct mqtt_conn *,
93+
const struct mqtt_topic *, int);
94+
int mqtt_ping(struct mqtt_conn *);

‎mqtt_protocol.h

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
/* */
2+
3+
/*
4+
* Copyright (c) 2021 David Gwynne <david@gwynne.id.au>
5+
*
6+
* Permission to use, copy, modify, and distribute this software for any
7+
* purpose with or without fee is hereby granted, provided that the above
8+
* copyright notice and this permission notice appear in all copies.
9+
*
10+
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11+
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12+
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13+
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14+
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15+
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16+
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17+
*/
18+
19+
#define MQTT_MAX_REMLEN 268435455
20+
#define MQTT_MAX_LEN 0xffff
21+
22+
#define MQTT_T_CONNECT 1
23+
#define MQTT_T_CONNACK 2
24+
#define MQTT_T_PUBLISH 3
25+
#define MQTT_T_PUBACK 4
26+
#define MQTT_T_PUBREC 5
27+
#define MQTT_T_PUBREL 6
28+
#define MQTT_T_PUBCOMP 7
29+
#define MQTT_T_SUBSCRIBE 8
30+
#define MQTT_T_SUBACK 9
31+
#define MQTT_T_UNSUBSCRIBE 10
32+
#define MQTT_T_UNSUBACK 11
33+
#define MQTT_T_PINGREQ 12
34+
#define MQTT_T_PINGRESP 13
35+
#define MQTT_T_DISCONNECT 14
36+
37+
#define MQTT_TYPE(_t) ((_t) << 4)
38+
39+
/*
40+
* this represents the maximum sized header, not necessarily the
41+
* actual header on the wire.
42+
*/
43+
44+
struct mqtt_header {
45+
uint8_t p;
46+
uint8_t remlen[4];
47+
};
48+
49+
struct mqtt_u16 {
50+
uint8_t hi;
51+
uint8_t lo;
52+
};
53+
54+
struct mqtt_p_connect {
55+
struct mqtt_u16 len;
56+
uint8_t mqtt[4];
57+
uint8_t level;
58+
uint8_t flags;
59+
#define MQTT_CONNECT_F_CLEAN_SESSION (1 << 1)
60+
#define MQTT_CONNECT_F_WILL (1 << 2)
61+
#define MQTT_CONNECT_F_WILL_QOS(_qos) ((_qos) << 3)
62+
#define MQTT_CONNECT_F_WILL_RETAIN (1 << 5)
63+
#define MQTT_CONNECT_F_PASSWORD (1 << 6)
64+
#define MQTT_CONNECT_F_USERNAME (1 << 7)
65+
struct mqtt_u16 keep_alive;
66+
};

0 commit comments

Comments
 (0)
Please sign in to comment.