调用Mosquitto实现mqtt物联网通信
By
admin
at 2021-09-06 • 1人收藏 • 1271人看过
感谢: indertust 分享代码
MQTT工作在 TCP/IP协议族上,是为硬件性能低下的远程设备以及网络状况糟糕的情况下而设计的发布/订阅型(publish/subscribe)消息协议.
此处使用的开源组件官方: https://mosquitto.org
目前代码还有几个问题未解决, 先分享出来供大家参考.
Mosquitto.aardio库代码如下:
//Mosquitto
/*
参考:
https://mosquitto.org/man/mosquitto-8.html
https://mosquitto.org/man/libmosquitto-3.html
*/
class Mosquitto{
ctor(tParam){
table.mixin(this, tParam)
if(type(this.clean_session) != type.boolean){
this.clean_session = true
}
if(type(this.keepalive) != type.number){
this.keepalive = 60
}
if(this.host == null){
this.host = '127.0.0.1'
}
if(type(this.port) != type.number){
this.port = 1883
}
if(this.subTopic == null){
this.subTopic = '$SYS/broker/messages/received'
}
this.listener = thread.command(this.winform)
this.listener.setInfo = function(info){
this.info = info
}
this.listener.onMessage = function(topic, message){
if(type(this.onMessage) == type.function){
this.onMessage(topic, message)
}
}
this.listener.onConnect = function(result){
if(type(this.onConnect) == type.function){
this.onConnect(result)
}
}
this.listener.onSubscribe = function(mid){
if(type(this.onSubscribe) == type.function){
this.onSubscribe(mid)
}
}
this.listener.onPublish = function(mid){
if(type(this.onPublish) == type.function){
this.onPublish(mid)
}
}
this.listener.onLog = function(str){
if(type(this.onLog) == type.function){
this.onLog(str)
}
}
this.threadHandle = thread.create(
function(host, port, subTopic, keepalive, clean_session, id){
import thread.command
import Mosquitto
import console
var listener = thread.command()
var mm = Mosquitto.mosquitto_message()
if(id){
id = string.random(12)
}
my_message_callback = function(mosq, userdata, message){
var msg = raw.convert(message,mm)
if(msg.payloadlen > 0){
thread.command.onMessage(msg.topic, msg.payload)
}
}
var my_message_callback_cdecl = raw.tocdecl(my_message_callback, "void(pointer, pointer, pointer)" )
my_connect_callback = function(mosq, userdata, result){
if(!result){
thread.command.setInfo('连接成功')
thread.command.onConnect(result)
if(subTopic){
var ret, mid = Mosquitto.subscribe(mosq, 0, subTopic, 2);
var qos = 0
var retain = false
var message = 'I am here, ' ++ string.random(10)
Mosquitto.publish(mosq, mid, subTopic, #message, message, qos, retain)
}
}else{
thread.command.setInfo('连接失败')
}
}
var my_connect_callback_cdecl = raw.tocdecl(my_connect_callback, "void(pointer, pointer, int)" )
my_subscribe_callback = function(mosq, userdata, mid, qos_count, granted_qos){
thread.command.onSubscribe(mid)
}
var my_subscribe_callback_cdecl = raw.tocdecl(my_subscribe_callback, "void(pointer, pointer, int, int, int &)" )
my_log_callback = function(mosq, userdata, level, str){
thread.command.onLog(str)
}
var my_log_callback_cdecl = raw.tocdecl(my_log_callback, "void(pointer, pointer, int, string)" )
//publish_callback_set= dll.api("mosquitto_publish_callback_set","void(pointer mosq, void (&on_publish)(struct &, pointer , int))");
my_publish_callback = function(mosq, obj, mid){
thread.command.onPublish(mid)
}
var my_publish_callback_cdecl = raw.tocdecl(my_publish_callback, "void(pointer, pointer, int)" )
Mosquitto.init()
var mosq, id = Mosquitto.new(id, clean_session, null)
if(mosq){
Mosquitto.log_callback_set(mosq, my_log_callback_cdecl);
Mosquitto.connect_callback_set(mosq, my_connect_callback_cdecl);
Mosquitto.message_callback_set(mosq, my_message_callback_cdecl);
Mosquitto.subscribe_callback_set(mosq, my_subscribe_callback_cdecl);
Mosquitto.publish_callback_set(mosq, my_publish_callback_cdecl);
}else{
thread.command.setInfo('内存不足')
}
if(Mosquitto.connect(mosq, host, port, keepalive)){
thread.command.setInfo('连接失败')
}else {
thread.command.setInfo('连接成功')
listener.pub = function(topic, mid, message, qos, retain){
Mosquitto.publish(mosq, mid, topic, #message, message, qos, retain)
}
Mosquitto.loop_forever(mosq, -1, 1);
Mosquitto.destroy(mosq);
Mosquitto.lib_cleanup();
}
},this.host, this.port, this.subTopic, this.keepalive, this.clean_session, this.id
)
};
sub = function(topic){
//暂时不用,阻塞
};
pub = function(topic, mid, message, qos, retain){
//暂时不用,阻塞
//thread.command.pub(topic, mid, message, qos, retain)
};
close = function(){
if(this.threadHandle){
raw.closehandle(this.threadHandle)
}
}
}
namespace Mosquitto{
table = ..table;
string = ..string;
io = ..io;
raw = ..raw;
import thread;
import thread.command;
dllPath = "/mosquitto.dll"
MOSQ_ERR_CONN_PENDING = -1;
MOSQ_ERR_SUCCESS = 0;
MOSQ_ERR_NOMEM = 1;
MOSQ_ERR_PROTOCOL = 2;
MOSQ_ERR_INVAL = 3;
MOSQ_ERR_NO_CONN = 4;
MOSQ_ERR_CONN_REFUSED = 5;
MOSQ_ERR_NOT_FOUND = 6;
MOSQ_ERR_CONN_LOST = 7;
MOSQ_ERR_TLS = 8;
MOSQ_ERR_PAYLOAD_SIZE = 9;
MOSQ_ERR_NOT_SUPPORTED = 10;
MOSQ_ERR_AUTH = 11;
MOSQ_ERR_ACL_DENIED = 12;
MOSQ_ERR_UNKNOWN = 13;
MOSQ_ERR_ERRNO = 14;
MOSQ_ERR_EAI = 15;
MOSQ_ERR_PROXY = 16
class mosquitto_message {
int mid;
string topic;
string payload;
int payloadlen;
int qos;
bool retain;
};
if(io.exist(dllPath)){
dll = raw.loadDll(dllPath,,"cdecl")
if(dll){
version= dll.api("mosquitto_lib_version","int(int &major, int &minor, int &revision)");
getVersion = function(){
var major, minor, revision = 0,0,0
var ret, major, minor, revision = version(major, minor, revision)
return string.format("%d.%d.%d", major, minor, revision), ret
}
init= dll.api("mosquitto_lib_init","int(void)");
cleanup= dll.api("mosquitto_lib_cleanup","int(void)");
new = dll.api("mosquitto_new","struct (string id, bool clean_session, pointer obj)");
destroy= dll.api("mosquitto_destroy","void(pointer mosq)");
username_pw_set= dll.api("mosquitto_username_pw_set","int(pointer mosq, string username, string password)");
connect= dll.api("mosquitto_connect","int(pointer mosq, string host, int port, int keepalive)");
disconnect= dll.api("mosquitto_disconnect","int(pointer mosq)");
publish= dll.api("mosquitto_publish","int(pointer mosq, int &mid, string topic, int payloadlen, string payload, int qos, bool retain)");
subscribe= dll.api("mosquitto_subscribe","int(pointer mosq, int &mid, string sub, int qos)");
unsubscribe= dll.api("mosquitto_unsubscribe","int(pointer mosq, int &mid, string sub)");
message_copy= dll.api("mosquitto_message_copy","int(struct_message &dst, struct_message &src)");
message_free= dll.api("mosquitto_message_free","void(struct_message &&message)");
loop= dll.api("mosquitto_loop","int(pointer mosq, int timeout, int max_packets)");
loop_forever= dll.api("mosquitto_loop_forever","int(pointer mosq, int timeout, int max_packets)");
loop_start= dll.api("mosquitto_loop_start","int(pointer mosq)");
loop_stop= dll.api("mosquitto_loop_stop","int(pointer mosq, bool force)");
connect_callback_set= dll.api("mosquitto_connect_callback_set","void(pointer mosq, pointer callback)");
disconnect_callback_set= dll.api("mosquitto_disconnect_callback_set","void(pointer mosq, pointer callback)");
publish_callback_set= dll.api("mosquitto_publish_callback_set","void(pointer mosq, pointer callback)");
message_callback_set= dll.api("mosquitto_message_callback_set","void(pointer mosq, pointer callback)");
subscribe_callback_set= dll.api("mosquitto_subscribe_callback_set","void(pointer mosq, pointer callback)");
unsubscribe_callback_set= dll.api("mosquitto_unsubscribe_callback_set","void(pointer mosq, pointer callback)");
log_callback_set= dll.api("mosquitto_log_callback_set","void(pointer mosq, pointer callback)");
strerror= dll.api("mosquitto_strerror","string(int mosq_errno)");
}
}
}
/**intellisense()
Mosquitto = Mosquitto
Mosquitto(.(tParam) = 创建 Mosquitto
Mosquitto() = !Mosquitto.
end intellisense**/
/**intellisense(!Mosquitto)
end intellisense**/mainform.aardio调用代码如下:
import win.ui;
/*DSG{{*/
mainForm = win.form(text="MosquittoTest";right=672;bottom=450)
mainForm.add(
buttonPublish={cls="button";text="Publish";left=564;top=103;right=659;bottom=137;z=6};
buttonStart={cls="button";text="Start";left=564;top=21;right=659;bottom=55;z=1};
buttonStop={cls="button";text="Stop";left=564;top=62;right=659;bottom=96;z=4};
buttonTest={cls="button";text="说明";left=564;top=144;right=659;bottom=178;z=2};
editLog={cls="edit";left=9;top=178;right=548;bottom=441;autohscroll=false;edge=1;multiline=1;vscroll=1;z=3};
editParam={cls="edit";left=9;top=8;right=548;bottom=169;autohscroll=false;edge=1;multiline=1;vscroll=1;z=5}
)
/*}}*/
/*
import console;
console.open()
execute("mode con cols=100 lines=500")
execute("color 0A")
var oh = console.getOutPutHandle()
var ih = console.getInputHandle()
console.modifyMode(ih,0,0x40)
p = function(...){
console.print(time(), ...)
}
d = console.dump
dj = console.dumpJson
vd = console.varDump
*/
import Mosquitto
var mosquitto
onMessage = function(topic, message){
mainForm.editLog.print('收到消息', topic, message)
}
onConnect = function(result){
mainForm.editLog.print('连接', result)
}
onSubscribe = function(mid){
mainForm.editLog.print('订阅 mid=', mid)
}
onPublish= function(mid){
mainForm.editLog.print('发布 mid=', mid)
}
onLog=onLog = function(str){
mainForm.editLog.print('日志', str)
}
mainForm.buttonStart.oncommand = function(id,event){
mainForm.editLog.print('version', Mosquitto.getVersion())
var sparam = mainForm.editParam.text
var param = eval(sparam)
mosquitto = Mosquitto(param)
}
mainForm.buttonTest.oncommand = function(id,event){
var msg = /*
0、下载安装 https://mosquitto.org/download/
1、先启动服务:mosquitto.exe -v -p 61613
2、运行本程序,点 "Start" 按钮
3、可用下面的命令发布消息测试
mosquitto_pub.exe -h 127.0.0.1 -p 61613 -t $SYS/broker/messages/received -m message_you_wanna_send
4、主要为收消息而做,可能 Mosquitto.loop_forever 阻塞了,无法发消息
*/
mainForm.editLog.print(msg)
}
mainForm.onClose = function(hwnd,message,wParam,lParam){
if(mosquitto){
mosquitto.close()
mosquitto = null
}
}
mainForm.buttonStop.oncommand = function(id,event){
if(mosquitto){
mosquitto.close()
mosquitto = null
}
}
var strPublish = /*
{
topic = '$SYS/broker/messages/received';
mid = 0;
message = string.random(20);
qos = 0;
retain = false;
}
*/
var strConnect = /*
{
ip = '127.0.0.1';
port = 61613;
onMessage = onMessage;
onConnect = onConnect;
onSubscribe = onSubscribe;
onPublish = onPublish;
onLog = onLog;
winform = mainForm;
}
*/
mainForm.editParam.text = strConnect
mainForm.buttonPublish.oncommand = function(id,event){
/*
if(mosquitto){
var sparam = mainForm.editParam.text
var param = eval(sparam)
mosquitto.pub(param.topic, param.mid, param.message, param.qos, param.retain)
}
*/
}
mainForm.editParam.oncommand = function(id,event){
}
mainForm.show();
return win.loopMessage();工程下载:
MosquittoTest 2021-9-5 1413.rar
登录后方可回帖