-
Notifications
You must be signed in to change notification settings - Fork 938
/
mqtt_aclient.hpp
184 lines (147 loc) · 4.77 KB
/
mqtt_aclient.hpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#pragma once
#include "../acl_cpp_define.hpp"
#include <string>
#include "../stream/aio_socket_stream.hpp"
namespace acl {
class sslbase_conf;
class mqtt_header;
class mqtt_message;
/**
* mqtt communication class in aio mode, used by mqtt client or mqtt server.
*/
class ACL_CPP_API mqtt_aclient : public aio_open_callback {
public:
/**
* Constructor
* @param handle {aio_handle&}
* @param ssl_conf {sslbase_conf*} if not NULL, ssl will be used
*/
explicit mqtt_aclient(aio_handle& handle, sslbase_conf* ssl_conf = NULL);
/**
* Because the subclass object was created dynamically, the method
* will be called when the subclass object is to be freed.
*/
virtual void destroy() = 0;
/**
* Get the ssl conf object passed in constructor.
* @return {sslbase_conf*} return NULL if not set.
*/
sslbase_conf* get_ssl_conf() const {
return ssl_conf_;
}
/**
* Connect the remote mqtt server, when connected with the server,
* the callback on_connect() will be called
* @param addr {const char*} the mqtt server's addr with the format
* ip|port, or domain|port
* @param conn_timeout {int} the timeout for connecting to the server
* @param rw_timeout {int} the timeout read/write with the server
* @return bool {bool} if return false, you should call destroy() to
* delete the subclass object
*/
bool open(const char* addr, int conn_timeout, int rw_timeout);
/**
* Called when connect or accept one connection
* @param conn {aio_socket_stream*}
* @return bool {bool} if return false, you should call destroy() to
* delete the subclass object
*/
bool open(aio_socket_stream* conn);
/**
* Close the connection with the mqtt server async
*/
void close();
/**
* Get the connection with the mqtt server
* @return {aio_socket_stream*} return NULL if not connected
*/
aio_socket_stream* get_conn() const {
return conn_;
}
/**
* Set the remote host name to specify the SSL SNI for SSL handshake,
* used to connect one mqtt server as a connection client.
* @param host {const char*} the host name
* @return {mqtt_aclient&}
*/
mqtt_aclient& set_host(const char* host);
mqtt_aclient& set_sni_prefix(const char* prefix);
mqtt_aclient& set_sni_suffix(const char* suffix);
public:
/**
* send one mqtt message to one mqtt peer.
* @param message {mqtt_message&}
* @return {bool} return true if sending successfully, or false if
* some error happened.
*/
bool send(mqtt_message& message);
public:
/**
* get the current dns addr when connection one mqtt server
* @param out {string&} store the result.
* @return {bool} return true if getting dns address successfully.
*/
bool get_ns_addr(string& out) const;
/**
* get the mqtt server addr after resolving the domain's address.
* @param out {string&} store the result.
* @return {bool} return true if getting server's address successfully.
*/
bool get_server_addr(string& out) const;
protected:
// the subclass should be created dynamically
virtual ~mqtt_aclient();
// @override dummy
bool open_callback() { return true; }
// @override
bool timeout_callback();
// @override
void close_callback();
// @override
bool read_wakeup();
// @override
bool read_callback(char* data, int len);
protected:
// wait for reading data from peer
bool message_await();
// virtual method called when resolving DNS failed.
virtual void on_ns_failed() {}
// virtual method called when it's timeout to connect mqtt server.
virtual void on_connect_timeout() {}
// virtual method called when it's failed to connect mqtt server.
virtual void on_connect_failed() {}
// virtual method called when reading timeout.
virtual bool on_read_timeout() { return false; }
// virtual method when connection was disconnected.
virtual void on_disconnect() {};
// should be implemented by subclass.
virtual bool on_open() = 0;
// subclass can implement the method to override the default method.
virtual bool on_header(const mqtt_header&) { return true; };
// should be implemented by subclass.
virtual bool on_body(const mqtt_message&) = 0;
private:
aio_handle& handle_;
sslbase_conf* ssl_conf_;
std::string sni_prefix_;
std::string sni_suffix_;
aio_socket_stream* conn_;
int rw_timeout_;
std::string host_;
struct sockaddr_storage ns_addr_;
struct sockaddr_storage serv_addr_;
mqtt_header* header_;
mqtt_message* body_;
// callbed when mqtt connection was created, which can be used
// for client or server.
bool open_done();
// used for ssl communication.
bool handle_ssl_handshake();
// handle the data received from mqtt connection.
int handle_data(char* data, int len);
// called if it's ok for connecting one mqtt server.
bool handle_connect(const ACL_ASTREAM_CTX* ctx);
// called by aio module of acl.
static int connect_callback(const ACL_ASTREAM_CTX* ctx);
};
} // namespace acl