forked from luca/mineqtt
feat: add SUBSCRIBE
This commit is contained in:
parent
29678e712d
commit
8feda55bb8
51
mqtt.lua
51
mqtt.lua
|
@ -1,3 +1,5 @@
|
||||||
|
local computer = require("computer")
|
||||||
|
|
||||||
local mqtt = {}
|
local mqtt = {}
|
||||||
|
|
||||||
local safeRead = function (conn, n)
|
local safeRead = function (conn, n)
|
||||||
|
@ -74,7 +76,7 @@ function MqttClient:new (conn)
|
||||||
|
|
||||||
conn.readVarint = readVarint
|
conn.readVarint = readVarint
|
||||||
conn.safeRead = safeRead
|
conn.safeRead = safeRead
|
||||||
conn:setTimeout(1)
|
conn:setTimeout(0)
|
||||||
|
|
||||||
c.conn = conn
|
c.conn = conn
|
||||||
c.is_connecting = false
|
c.is_connecting = false
|
||||||
|
@ -89,7 +91,12 @@ function MqttClient:handle ()
|
||||||
end
|
end
|
||||||
|
|
||||||
local data, err = self.conn:safeRead(2)
|
local data, err = self.conn:safeRead(2)
|
||||||
|
|
||||||
if err ~= nil then
|
if err ~= nil then
|
||||||
|
if (string.find(err, "timeout")) then
|
||||||
|
return nil
|
||||||
|
end
|
||||||
|
|
||||||
return err
|
return err
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -137,8 +144,15 @@ function MqttClient:handle ()
|
||||||
self.is_connected = true
|
self.is_connected = true
|
||||||
elseif ptype & 0xF0 == 0x40 then -- PUBACK
|
elseif ptype & 0xF0 == 0x40 then -- PUBACK
|
||||||
-- TODO
|
-- TODO
|
||||||
|
elseif ptype & 0xF0 == 0x90 then -- SUBACK
|
||||||
|
-- TODO
|
||||||
elseif ptype & 0xF0 == 0xD0 then -- PINGRESP
|
elseif ptype & 0xF0 == 0xD0 then -- PINGRESP
|
||||||
-- TODO
|
-- TODO
|
||||||
|
elseif ptype & 0xF0 == 0x30 then -- PUBLISH
|
||||||
|
local topic, _, next = string.unpack("> s2 B", data)
|
||||||
|
local message = string.sub(data, next)
|
||||||
|
|
||||||
|
computer.pushSignal("mqtt_message", topic, message)
|
||||||
elseif ptype & 0xF0 == 0xE0 then -- DISCONNECT
|
elseif ptype & 0xF0 == 0xE0 then -- DISCONNECT
|
||||||
if ptype ~= 0xE0 then
|
if ptype ~= 0xE0 then
|
||||||
self:disconnect(0x81)
|
self:disconnect(0x81)
|
||||||
|
@ -229,6 +243,39 @@ function MqttClient:publish (topic, payload)
|
||||||
return nil
|
return nil
|
||||||
end
|
end
|
||||||
|
|
||||||
|
function MqttClient:subscribe (...)
|
||||||
|
if not (self.is_connecting or self.is_connected) then
|
||||||
|
return "no connection"
|
||||||
|
end
|
||||||
|
|
||||||
|
local count = select('#', ...)
|
||||||
|
|
||||||
|
if count == 0 then
|
||||||
|
return "topic is required"
|
||||||
|
end
|
||||||
|
|
||||||
|
local packet_id = 1
|
||||||
|
local data = string.pack("> I2 B", packet_id, 0)
|
||||||
|
|
||||||
|
for i = 1, count do
|
||||||
|
data = data .. string.pack("> s2 B", select(i, ...), 0)
|
||||||
|
end
|
||||||
|
|
||||||
|
local header = string.char(0x82) .. encodeVarint(#data)
|
||||||
|
|
||||||
|
local _, err = self.conn:write(header .. data)
|
||||||
|
if err ~= nil then
|
||||||
|
return err
|
||||||
|
end
|
||||||
|
|
||||||
|
local _, err = self.conn:flush()
|
||||||
|
if err ~= nil then
|
||||||
|
return err
|
||||||
|
end
|
||||||
|
|
||||||
|
return nil
|
||||||
|
end
|
||||||
|
|
||||||
function MqttClient:disconnect (reason)
|
function MqttClient:disconnect (reason)
|
||||||
if not (self.is_connecting or self.is_connected) then
|
if not (self.is_connecting or self.is_connected) then
|
||||||
return "no connection"
|
return "no connection"
|
||||||
|
@ -259,4 +306,4 @@ function MqttClient:disconnect (reason)
|
||||||
return nil
|
return nil
|
||||||
end
|
end
|
||||||
|
|
||||||
return mqtt
|
return mqtt
|
Loading…
Reference in New Issue