From 1e2a0b52d9349c6a93744681e60153c4630f754c Mon Sep 17 00:00:00 2001 From: Markus Gebhard Date: Sat, 3 Jan 2015 11:43:54 +0100 Subject: [PATCH 01/13] make 'old' flukso daemon publish configuration data --- openwrt/package/flukso/luasrc/fluksod.lua | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/openwrt/package/flukso/luasrc/fluksod.lua b/openwrt/package/flukso/luasrc/fluksod.lua index 6f657298..ae149362 100755 --- a/openwrt/package/flukso/luasrc/fluksod.lua +++ b/openwrt/package/flukso/luasrc/fluksod.lua @@ -126,6 +126,15 @@ mosq.init() local mqtt = mosq.new(MOSQ_ID, MOSQ_CLN_SESSION) mqtt:connect(MOSQ_HOST, MOSQ_PORT, MOSQ_KEEPALIVE) +-- publish configuration data +local luci = require "luci" +luci.json = require "luci.json" +local DEVICE = uci:get_first('system', 'system', 'device') +local MOSQ_TOPIC_SENSOR_CONFIG = string.format('/device/%s/config/sensor', DEVICE) +local flm_config = luci.json.encode(FLUKSO) +mqtt:publish(MOSQ_TOPIC_SENSOR_CONFIG, flm_config, MOSQ_QOS, MOSQ_RETAIN) +-- end of publishing configuration + local function dispatch(wan_child, lan_child) return coroutine.create(function() local delta = { From 955c4e077e1afacd0217c4a793044a2619d6204d Mon Sep 17 00:00:00 2001 From: Markus Gebhard Date: Fri, 30 Jan 2015 20:33:51 +0100 Subject: [PATCH 02/13] Preparation of query on tempo data Prepare the tmpo daemon to publish query data on request --- openwrt/package/flukso/luasrc/tmpod.lua | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/openwrt/package/flukso/luasrc/tmpod.lua b/openwrt/package/flukso/luasrc/tmpod.lua index ced48c31..0142fbf6 100755 --- a/openwrt/package/flukso/luasrc/tmpod.lua +++ b/openwrt/package/flukso/luasrc/tmpod.lua @@ -75,6 +75,10 @@ local TMPO_TOPIC_SYNC_PUB = "/device/%s/tmpo/sync" local TMPO_TOPIC_SENSOR_SUB = "/sensor/+/+" local TMPO_TOPIC_SENSOR_PUB = "/sensor/%s/tmpo/%d/%d/%d/gz" -- /sensor/[sid]/tmpo/[rid]/[lvl]/[bid]/gz + +local TMPO_TOPIC_QUERY_PUB = "/sensor/%s/query" -- provide queried data as payload +local TMPO_TOPIC_QUERY_SUB = "/query/+" -- get sensor to query with payload interval + local TMPO_GC20_THRESHOLD = 100 -- 100 free 4kB blocks out of +-1000 in jffs2 = 90% full local TMPO_GZCHECK_EXEC_FMT = "gzip -trS '' %s 2>&1" local TMPO_GZCHECK_FILE_REGEX = "^gzip:%s*([%w%.%-_/]+):.*$" @@ -103,6 +107,8 @@ while not mqtt:connect(MOSQ_HOST, MOSQ_PORT, MOSQ_KEEPALIVE) do end mqtt:subscribe(TMPO_TOPIC_SYNC_SUB:format(DEVICE), MOSQ_QOS0) mqtt:subscribe(TMPO_TOPIC_SENSOR_SUB, MOSQ_QOS0) +-- subscribe to query topic +mqtt:subscribe(TMPO_TOPIC_QUERY_SUB, MOSQ_QOS0) local config = { sensor = nil, @@ -613,9 +619,16 @@ mqtt:set_callback(mosq.ON_MESSAGE, function(mid, topic, jpayload, qos, retain) tmpo:sync1(payload) end + local function query(sid) + local payload = luci.json.decode(jpayload) + return true + end + if retain then return end if not sensor(topic:match(TMPO_REGEX_SENSOR)) then - sync(topic:match(TMPO_REGEX_SYNC)) + if not query(topic:match(TMPO_REGEX_SENSOR)) then + sync(topic:match(TMPO_REGEX_SYNC)) + end end end) From 94a92147c60b66125531a6e58a7bb611564fac0b Mon Sep 17 00:00:00 2001 From: Markus Gebhard Date: Sat, 14 Feb 2015 17:46:04 +0100 Subject: [PATCH 03/13] next step in providing tmpo data on query --- openwrt/package/flukso/luasrc/tmpod.lua | 39 ++++++++++++++++++++----- 1 file changed, 31 insertions(+), 8 deletions(-) diff --git a/openwrt/package/flukso/luasrc/tmpod.lua b/openwrt/package/flukso/luasrc/tmpod.lua index 0142fbf6..fa312524 100755 --- a/openwrt/package/flukso/luasrc/tmpod.lua +++ b/openwrt/package/flukso/luasrc/tmpod.lua @@ -70,6 +70,7 @@ local TMPO_REGEX_V1 = '^,"v":%[0(.*)$' local TMPO_REGEX_V2 = '^(.-)%].*$' local TMPO_REGEX_SYNC = "^/d/device/(%x+)/tmpo/sync$" local TMPO_REGEX_SENSOR = "^/sensor/(%x+)/(%l+)$" +local TMPO_REGEX_QUERY = "^/query/(%x+)/tmpo$" local TMPO_TOPIC_SYNC_SUB = "/d/device/%s/tmpo/sync" local TMPO_TOPIC_SYNC_PUB = "/device/%s/tmpo/sync" local TMPO_TOPIC_SENSOR_SUB = "/sensor/+/+" @@ -77,7 +78,7 @@ local TMPO_TOPIC_SENSOR_PUB = "/sensor/%s/tmpo/%d/%d/%d/gz" -- /sensor/[sid]/tmpo/[rid]/[lvl]/[bid]/gz local TMPO_TOPIC_QUERY_PUB = "/sensor/%s/query" -- provide queried data as payload -local TMPO_TOPIC_QUERY_SUB = "/query/+" -- get sensor to query with payload interval +local TMPO_TOPIC_QUERY_SUB = "/query/+/tmpo" -- get sensor to query with payload interval local TMPO_GC20_THRESHOLD = 100 -- 100 free 4kB blocks out of +-1000 in jffs2 = 90% full local TMPO_GZCHECK_EXEC_FMT = "gzip -trS '' %s 2>&1" @@ -604,6 +605,15 @@ local tmpo = { } mqtt:set_callback(mosq.ON_MESSAGE, function(mid, topic, jpayload, qos, retain) + local function sdir(path) + local files = { } + for file in nixio.fs.dir(path) or function() end do --dummy iterator + files[#files + 1] = tonumber(file) or file + end + table.sort(files) + return files + end + local function sensor(sid, dtype) local sparams = config.sensor[sid] if not (sid and sparams and dtype == sparams.data_type) then return end @@ -619,16 +629,29 @@ mqtt:set_callback(mosq.ON_MESSAGE, function(mid, topic, jpayload, qos, retain) tmpo:sync1(payload) end - local function query(sid) - local payload = luci.json.decode(jpayload) - return true - end +-- publish the stored files on a query request + local function query(sid) +-- payload contains query time interval {from:fromtimestamp, to:totimestamp} + local payload = luci.json.decode(jpayload) + for rid in nixio.fs.dir(TMPO_BASE_PATH .. sid) do + for _, lvl in ipairs(sdir(TMPO_PATH_TPL:format(sid, rid, "", ""))) do + for _, bid in ipairs(sdir(TMPO_PATH_TPL:format(sid, rid, lvl, ""))) do + if bid >= payload.from and bid <= payload.to + -- publish the respective file containing the requested values + -- note, the query interval may be smaller than a file's content + -- then the respective file must be sent + end + end + end + end + return true + end if retain then return end if not sensor(topic:match(TMPO_REGEX_SENSOR)) then - if not query(topic:match(TMPO_REGEX_SENSOR)) then - sync(topic:match(TMPO_REGEX_SYNC)) - end + if not query(topic:match(TMPO_REGEX_QUERY)) then + sync(topic:match(TMPO_REGEX_SYNC)) + end end end) From 93033759e755aaf68c1251d937375e3c92ce80c6 Mon Sep 17 00:00:00 2001 From: Markus Gebhard Date: Sun, 15 Feb 2015 16:21:18 +0100 Subject: [PATCH 04/13] tmpo daemaon with query callback --- openwrt/package/flukso/luasrc/tmpod.lua | 69 ++++++++++++++++++------- 1 file changed, 50 insertions(+), 19 deletions(-) diff --git a/openwrt/package/flukso/luasrc/tmpod.lua b/openwrt/package/flukso/luasrc/tmpod.lua index fa312524..4a2ea5e0 100755 --- a/openwrt/package/flukso/luasrc/tmpod.lua +++ b/openwrt/package/flukso/luasrc/tmpod.lua @@ -39,7 +39,8 @@ local DEBUG = { config = false, block8 = false, compact = false, - sync = false + sync = false, + query = false } local DAEMON = os.getenv("DAEMON") or "tmpod" @@ -70,15 +71,16 @@ local TMPO_REGEX_V1 = '^,"v":%[0(.*)$' local TMPO_REGEX_V2 = '^(.-)%].*$' local TMPO_REGEX_SYNC = "^/d/device/(%x+)/tmpo/sync$" local TMPO_REGEX_SENSOR = "^/sensor/(%x+)/(%l+)$" -local TMPO_REGEX_QUERY = "^/query/(%x+)/tmpo$" local TMPO_TOPIC_SYNC_SUB = "/d/device/%s/tmpo/sync" local TMPO_TOPIC_SYNC_PUB = "/device/%s/tmpo/sync" local TMPO_TOPIC_SENSOR_SUB = "/sensor/+/+" local TMPO_TOPIC_SENSOR_PUB = "/sensor/%s/tmpo/%d/%d/%d/gz" -- /sensor/[sid]/tmpo/[rid]/[lvl]/[bid]/gz +local TMPO_REGEX_QUERY = "^/query/(%x+)/tmpo$" local TMPO_TOPIC_QUERY_PUB = "/sensor/%s/query" -- provide queried data as payload local TMPO_TOPIC_QUERY_SUB = "/query/+/tmpo" -- get sensor to query with payload interval +local TMPO_FMT_QUERY = "time:%d sid:%s rid:%d lvl:%2d bid:%d" local TMPO_GC20_THRESHOLD = 100 -- 100 free 4kB blocks out of +-1000 in jffs2 = 90% full local TMPO_GZCHECK_EXEC_FMT = "gzip -trS '' %s 2>&1" @@ -94,6 +96,7 @@ local MOSQ_TIMEOUT = 0 -- return instantly from select call local MOSQ_MAX_PKTS = 1 -- packets local MOSQ_QOS0 = 0 local MOSQ_QOS1 = 1 +local MOSQ_QOS2 = 2 local MOSQ_RETAIN = true local MOSQ_ERROR = "MQTT error: %s" @@ -614,6 +617,27 @@ mqtt:set_callback(mosq.ON_MESSAGE, function(mid, topic, jpayload, qos, retain) return files end + local function dprint(fmt, ...) + if DEBUG.query then + print(fmt:format( + os.time(), + ...)) + end + end + + local function publish(sid, rid, lvl, bid) + dprint(TMPO_FMT_QUERY, sid, rid, lvl, bid) + local path = TMPO_PATH_TPL:format(sid, rid, lvl, bid) + local source = assert(io.open(path, "r")) + local payload = source:read("*all") + local topic = TMPO_TOPIC_QUERY_PUB:format(sid) + if DEBUG.query then + print("publishing", topic, payload) + end + mqtt:publish(topic, payload, MOSQ_QOS2, not MOSQ_RETAIN) + source:close() + end + local function sensor(sid, dtype) local sparams = config.sensor[sid] if not (sid and sparams and dtype == sparams.data_type) then return end @@ -629,23 +653,30 @@ mqtt:set_callback(mosq.ON_MESSAGE, function(mid, topic, jpayload, qos, retain) tmpo:sync1(payload) end --- publish the stored files on a query request - local function query(sid) --- payload contains query time interval {from:fromtimestamp, to:totimestamp} - local payload = luci.json.decode(jpayload) - for rid in nixio.fs.dir(TMPO_BASE_PATH .. sid) do - for _, lvl in ipairs(sdir(TMPO_PATH_TPL:format(sid, rid, "", ""))) do - for _, bid in ipairs(sdir(TMPO_PATH_TPL:format(sid, rid, lvl, ""))) do - if bid >= payload.from and bid <= payload.to - -- publish the respective file containing the requested values - -- note, the query interval may be smaller than a file's content - -- then the respective file must be sent - end - end - end - end - return true - end + -- publish the stored files on a query request + local function query(sid) + -- payload contains query time interval [fromtimestamp, totimestamp] + local payload = luci.json.decode(jpayload) + local lastbid = 0 + if DEBUG.query then + print("entered query with ", sid, payload[1], payload[2]) + end + for rid in nixio.fs.dir(TMPO_BASE_PATH .. sid) do + for _, lvl in ipairs(sdir(TMPO_PATH_TPL:format(sid, rid, "", ""))) do + for _, bid in ipairs(sdir(TMPO_PATH_TPL:format(sid, rid, lvl, ""))) do + -- detect store with containing or overlapping values + if ((payload[1] <= bid) and (bid <= payload[2])) then + publish(sid, rid, lvl, bid) + end + if ((lastbid ~= 0) and (bid >= payload[2]) and (lastbid <= payload[1])) then + publish(sid, rid, lvl, lastbid) + end + lastbid = bid + end + end + end + return true + end if retain then return end if not sensor(topic:match(TMPO_REGEX_SENSOR)) then From 8ee66f14a8b181ed6aa501eebafba85b9117c8f8 Mon Sep 17 00:00:00 2001 From: Markus Gebhard Date: Mon, 16 Feb 2015 08:54:22 +0100 Subject: [PATCH 05/13] return conditions/interval boundaries --- openwrt/package/flukso/luasrc/tmpod.lua | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/openwrt/package/flukso/luasrc/tmpod.lua b/openwrt/package/flukso/luasrc/tmpod.lua index 5295e790..3b5a9dbd 100755 --- a/openwrt/package/flukso/luasrc/tmpod.lua +++ b/openwrt/package/flukso/luasrc/tmpod.lua @@ -79,7 +79,7 @@ local TMPO_TOPIC_SENSOR_PUB = "/sensor/%s/tmpo/%d/%d/%d/gz" -- /sensor/[sid]/tmpo/[rid]/[lvl]/[bid]/gz local TMPO_REGEX_QUERY = "^/query/(%x+)/tmpo$" -local TMPO_TOPIC_QUERY_PUB = "/sensor/%s/query" -- provide queried data as payload +local TMPO_TOPIC_QUERY_PUB = "/sensor/%s/query/%s/%s" -- provide queried data as payload local TMPO_TOPIC_QUERY_SUB = "/query/+/tmpo" -- get sensor to query with payload interval local TMPO_FMT_QUERY = "time:%d sid:%s rid:%d lvl:%2d bid:%d" @@ -643,12 +643,12 @@ mqtt:set_callback(mosq.ON_MESSAGE, function(mid, topic, jpayload, qos, retain) end end - local function publish(sid, rid, lvl, bid) + local function publish(sid, rid, lvl, bid, from, to) dprint(TMPO_FMT_QUERY, sid, rid, lvl, bid) local path = TMPO_PATH_TPL:format(sid, rid, lvl, bid) local source = assert(io.open(path, "r")) local payload = source:read("*all") - local topic = TMPO_TOPIC_QUERY_PUB:format(sid) + local topic = TMPO_TOPIC_QUERY_PUB:format(sid, from, to) if DEBUG.query then print("publishing", topic, payload) end @@ -673,21 +673,25 @@ mqtt:set_callback(mosq.ON_MESSAGE, function(mid, topic, jpayload, qos, retain) -- publish the stored files on a query request local function query(sid) + if not sid then return end -- payload contains query time interval [fromtimestamp, totimestamp] local payload = luci.json.decode(jpayload) + if payload == nil then return end local lastbid = 0 + local from = payload[1] + local to = payload[2] if DEBUG.query then - print("entered query with ", sid, payload[1], payload[2]) + print("entered query with ", sid, from, to) end for rid in nixio.fs.dir(TMPO_BASE_PATH .. sid) do for _, lvl in ipairs(sdir(TMPO_PATH_TPL:format(sid, rid, "", ""))) do for _, bid in ipairs(sdir(TMPO_PATH_TPL:format(sid, rid, lvl, ""))) do -- detect store with containing or overlapping values - if ((payload[1] <= bid) and (bid <= payload[2])) then - publish(sid, rid, lvl, bid) + if ((from <= bid) and (bid <= to)) then + publish(sid, rid, lvl, bid, from, to) end - if ((lastbid ~= 0) and (bid >= payload[2]) and (lastbid <= payload[1])) then - publish(sid, rid, lvl, lastbid) + if ((lastbid ~= 0) and (lastbid < from) and (bid > to)) then + publish(sid, rid, lvl, lastbid, from, to) end lastbid = bid end From af1907f385fb0f2ea8e869758f0aa663b69230b0 Mon Sep 17 00:00:00 2001 From: Markus Gebhard Date: Sun, 22 Feb 2015 09:28:28 +0100 Subject: [PATCH 06/13] overlapping intervals corrected --- openwrt/package/flukso/luasrc/tmpod.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openwrt/package/flukso/luasrc/tmpod.lua b/openwrt/package/flukso/luasrc/tmpod.lua index 3b5a9dbd..e9eca34c 100755 --- a/openwrt/package/flukso/luasrc/tmpod.lua +++ b/openwrt/package/flukso/luasrc/tmpod.lua @@ -690,7 +690,7 @@ mqtt:set_callback(mosq.ON_MESSAGE, function(mid, topic, jpayload, qos, retain) if ((from <= bid) and (bid <= to)) then publish(sid, rid, lvl, bid, from, to) end - if ((lastbid ~= 0) and (lastbid < from) and (bid > to)) then + if ((lastbid ~= 0) and (lastbid < from) and (bid > from)) then publish(sid, rid, lvl, lastbid, from, to) end lastbid = bid From b212a0917569fcaf68dd2656428bdac045c433fc Mon Sep 17 00:00:00 2001 From: Markus Gebhard Date: Mon, 23 Feb 2015 12:31:41 +0100 Subject: [PATCH 07/13] [tmpo] correct query order of storage --- openwrt/package/flukso/luasrc/tmpod.lua | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/openwrt/package/flukso/luasrc/tmpod.lua b/openwrt/package/flukso/luasrc/tmpod.lua index e9eca34c..b3893ed9 100755 --- a/openwrt/package/flukso/luasrc/tmpod.lua +++ b/openwrt/package/flukso/luasrc/tmpod.lua @@ -677,6 +677,7 @@ mqtt:set_callback(mosq.ON_MESSAGE, function(mid, topic, jpayload, qos, retain) -- payload contains query time interval [fromtimestamp, totimestamp] local payload = luci.json.decode(jpayload) if payload == nil then return end + local lastlvl = 0 local lastbid = 0 local from = payload[1] local to = payload[2] @@ -684,15 +685,16 @@ mqtt:set_callback(mosq.ON_MESSAGE, function(mid, topic, jpayload, qos, retain) print("entered query with ", sid, from, to) end for rid in nixio.fs.dir(TMPO_BASE_PATH .. sid) do - for _, lvl in ipairs(sdir(TMPO_PATH_TPL:format(sid, rid, "", ""))) do + for _, lvl in ipairs(TMPO_LVLS_REVERSE) do -- storage has to be queried from past to now for _, bid in ipairs(sdir(TMPO_PATH_TPL:format(sid, rid, lvl, ""))) do -- detect store with containing or overlapping values if ((from <= bid) and (bid <= to)) then publish(sid, rid, lvl, bid, from, to) end if ((lastbid ~= 0) and (lastbid < from) and (bid > from)) then - publish(sid, rid, lvl, lastbid, from, to) + publish(sid, rid, lastlvl, lastbid, from, to) end + lastlvl = lvl lastbid = bid end end From 2633729e9de406b301fa0b18c1e0fa98d61fc020 Mon Sep 17 00:00:00 2001 From: Markus Gebhard Date: Thu, 26 Feb 2015 08:09:12 +0100 Subject: [PATCH 08/13] [tmpo] query potential data outside interval E.g. for solar readings it can happen that there is a compressed file on level 12, but no further readings; thus, publish this file as it might contain query relevant data (example: solar readings ended at 15:45, query is 12:00 to 18:00, compressed solar file starts before 12:00 - without correction it wouldn't be read) --- openwrt/package/flukso/luasrc/tmpod.lua | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/openwrt/package/flukso/luasrc/tmpod.lua b/openwrt/package/flukso/luasrc/tmpod.lua index b3893ed9..3d3d407a 100755 --- a/openwrt/package/flukso/luasrc/tmpod.lua +++ b/openwrt/package/flukso/luasrc/tmpod.lua @@ -650,7 +650,8 @@ mqtt:set_callback(mosq.ON_MESSAGE, function(mid, topic, jpayload, qos, retain) local payload = source:read("*all") local topic = TMPO_TOPIC_QUERY_PUB:format(sid, from, to) if DEBUG.query then - print("publishing", topic, payload) + local str = string.format("publishing topic:%s payload:%s", topic, payload) + print(str) end merror(mqtt:publish(topic, payload, MOSQ_QOS2, not MOSQ_RETAIN)) source:close() @@ -677,12 +678,15 @@ mqtt:set_callback(mosq.ON_MESSAGE, function(mid, topic, jpayload, qos, retain) -- payload contains query time interval [fromtimestamp, totimestamp] local payload = luci.json.decode(jpayload) if payload == nil then return end + local lastrid = 0 local lastlvl = 0 local lastbid = 0 + local published = false local from = payload[1] local to = payload[2] if DEBUG.query then - print("entered query with ", sid, from, to) + local str = string.format("entered sensor:%s from:%s to:%s", sid, from, to) + print(str) end for rid in nixio.fs.dir(TMPO_BASE_PATH .. sid) do for _, lvl in ipairs(TMPO_LVLS_REVERSE) do -- storage has to be queried from past to now @@ -690,15 +694,27 @@ mqtt:set_callback(mosq.ON_MESSAGE, function(mid, topic, jpayload, qos, retain) -- detect store with containing or overlapping values if ((from <= bid) and (bid <= to)) then publish(sid, rid, lvl, bid, from, to) + published = true end if ((lastbid ~= 0) and (lastbid < from) and (bid > from)) then publish(sid, rid, lastlvl, lastbid, from, to) + published = true end + -- recognize overlaps in different compression stages + lastrid = rid lastlvl = lvl lastbid = bid + if DEBUG.query then + str = string.format("processed file /%s/%s/%s", rid, lvl, bid) + print(str) + end end end end + -- send last stored file in case there were no further readings, e.g. on solar + if ((published == false) and (lastbid < from)) then + publish(sid, lastrid, lastlvl, lastbid, from, to) + end return true end From e9f1278984604e8405c3bc601ce0170337d3a9c4 Mon Sep 17 00:00:00 2001 From: Markus Gebhard Date: Sat, 27 Jun 2015 15:26:42 +0200 Subject: [PATCH 09/13] config publishing in tmpod --- openwrt/package/flukso/luasrc/fluksod.lua | 9 --------- 1 file changed, 9 deletions(-) diff --git a/openwrt/package/flukso/luasrc/fluksod.lua b/openwrt/package/flukso/luasrc/fluksod.lua index ae149362..6f657298 100755 --- a/openwrt/package/flukso/luasrc/fluksod.lua +++ b/openwrt/package/flukso/luasrc/fluksod.lua @@ -126,15 +126,6 @@ mosq.init() local mqtt = mosq.new(MOSQ_ID, MOSQ_CLN_SESSION) mqtt:connect(MOSQ_HOST, MOSQ_PORT, MOSQ_KEEPALIVE) --- publish configuration data -local luci = require "luci" -luci.json = require "luci.json" -local DEVICE = uci:get_first('system', 'system', 'device') -local MOSQ_TOPIC_SENSOR_CONFIG = string.format('/device/%s/config/sensor', DEVICE) -local flm_config = luci.json.encode(FLUKSO) -mqtt:publish(MOSQ_TOPIC_SENSOR_CONFIG, flm_config, MOSQ_QOS, MOSQ_RETAIN) --- end of publishing configuration - local function dispatch(wan_child, lan_child) return coroutine.create(function() local delta = { From 42595786a7c978e052f52fb83af0e2f6f88bc45b Mon Sep 17 00:00:00 2001 From: Markus Gebhard Date: Sat, 18 Jul 2015 12:47:58 +0200 Subject: [PATCH 10/13] reset to original distribution to start with queryd --- openwrt/package/flukso/luasrc/fluksod.lua | 9 --------- 1 file changed, 9 deletions(-) diff --git a/openwrt/package/flukso/luasrc/fluksod.lua b/openwrt/package/flukso/luasrc/fluksod.lua index ae149362..6f657298 100755 --- a/openwrt/package/flukso/luasrc/fluksod.lua +++ b/openwrt/package/flukso/luasrc/fluksod.lua @@ -126,15 +126,6 @@ mosq.init() local mqtt = mosq.new(MOSQ_ID, MOSQ_CLN_SESSION) mqtt:connect(MOSQ_HOST, MOSQ_PORT, MOSQ_KEEPALIVE) --- publish configuration data -local luci = require "luci" -luci.json = require "luci.json" -local DEVICE = uci:get_first('system', 'system', 'device') -local MOSQ_TOPIC_SENSOR_CONFIG = string.format('/device/%s/config/sensor', DEVICE) -local flm_config = luci.json.encode(FLUKSO) -mqtt:publish(MOSQ_TOPIC_SENSOR_CONFIG, flm_config, MOSQ_QOS, MOSQ_RETAIN) --- end of publishing configuration - local function dispatch(wan_child, lan_child) return coroutine.create(function() local delta = { From 0ef2cf33b362afd0a608ad170e64d023f91ae2a0 Mon Sep 17 00:00:00 2001 From: Markus Gebhard Date: Sat, 18 Jul 2015 13:10:42 +0200 Subject: [PATCH 11/13] install query daemon for TMPO time series --- openwrt/package/flukso/Makefile | 6 +- openwrt/package/flukso/config/flukso.init | 8 + openwrt/package/flukso/luasrc/queryd.lua | 195 ++++++++++++++++++++++ 3 files changed, 206 insertions(+), 3 deletions(-) create mode 100644 openwrt/package/flukso/luasrc/queryd.lua diff --git a/openwrt/package/flukso/Makefile b/openwrt/package/flukso/Makefile index a27bb01f..a9afe7fa 100644 --- a/openwrt/package/flukso/Makefile +++ b/openwrt/package/flukso/Makefile @@ -8,7 +8,7 @@ include $(TOPDIR)/rules.mk PKG_NAME:=flukso -PKG_VERSION:=2.4.7 +PKG_VERSION:=2.4.8 PKG_RELEASE:=beta PKG_BUILD_DIR:=$(BUILD_DIR)/$(PKG_NAME)-$(PKG_VERSION) @@ -69,10 +69,10 @@ define Package/flukso/install $(INSTALL_DIR) $(1)/usr/sbin $(INSTALL_BIN) $(PKG_BUILD_DIR)/luad $(1)/usr/sbin/ - for link in fluksod spid parsed kubed tmpod wwd supd; do \ + for link in fluksod spid parsed kubed queryd tmpod wwd supd; do \ $(LN) /usr/sbin/luad $(1)/usr/sbin/"$$$$link"; \ done - $(INSTALL_BIN) ./luasrc/{fluksod,spid,parsed,kubed,tmpod,wwd,supd}.lua $(1)/usr/sbin/ + $(INSTALL_BIN) ./luasrc/{fluksod,spid,parsed,kubed,queryd,tmpod,wwd,supd}.lua $(1)/usr/sbin/ $(INSTALL_DIR) $(1)/usr/share/tmpo/sensor chmod a+w $(1)/usr/share/tmpo/sensor diff --git a/openwrt/package/flukso/config/flukso.init b/openwrt/package/flukso/config/flukso.init index b7435ab1..18df4982 100755 --- a/openwrt/package/flukso/config/flukso.init +++ b/openwrt/package/flukso/config/flukso.init @@ -137,6 +137,7 @@ start_ww() /usr/sbin/supd /usr/sbin/wwd /usr/sbin/tmpod -u flukso + /usr/sbin/queryd -u flukso set_ttyS0_W echo 0 > /sys/devices/virtual/gpio/gpio4/value HEARTBEAT=/usr/bin/heartbeat @@ -168,6 +169,7 @@ start_fl() /usr/sbin/spid -u flukso /usr/sbin/fluksod -u flukso /usr/sbin/tmpod -u flukso + /usr/sbin/queryd -u flukso MODEL=$(uci get system.@system[0].model) @@ -235,6 +237,9 @@ stop_ww() /usr/sbin/tmpod -k rm -rf /var/run/tmpod + /usr/sbin/queryd -k + rm -rf /var/run/queryd + /usr/sbin/wwd -k rm -rf /var/run/wwd @@ -261,6 +266,9 @@ stop_fl() /usr/sbin/tmpod -k rm -rf /var/run/tmpod + /usr/sbin/queryd -k + rm -rf /var/run/queryd + /usr/sbin/fluksod -k rm -rf /var/run/fluksod diff --git a/openwrt/package/flukso/luasrc/queryd.lua b/openwrt/package/flukso/luasrc/queryd.lua new file mode 100644 index 00000000..736789f9 --- /dev/null +++ b/openwrt/package/flukso/luasrc/queryd.lua @@ -0,0 +1,195 @@ +#!/usr/bin/env lua + +--[[ + + queryd.lua - Flukso timeseries query daemon + + Copyright (C) 2014 Bart Van Der Meerssche + 2015 Markus Gebhard + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program. If not, see . + +]]-- + +local dbg = require "dbg" +local nixio = require "nixio" +nixio.fs = require "nixio.fs" +local luci = require "luci" +luci.json = require "luci.json" +luci.util = require "luci.util" +local uci = require "luci.model.uci".cursor() +local uloop = require "uloop" +uloop.init() +local ubus = require "ubus" +local ub = assert(ubus.connect(), "unable to connect to ubus") +local mosq = require "mosquitto" + +local DEBUG = { + query = false +} + +local DAEMON = os.getenv("DAEMON") or "queryd" +local DEVICE = uci:get_first("system", "system", "device") +local ULOOP_TIMEOUT_MS = 1e3 +local SLEEP_S, SLEEP_NS = 1, 0 +local TIMESTAMP_MIN = 1234567890 + +-- TMPO params +local TMPO_FORMAT_VERSION = 1 +local TMPO_NICE = 10 +local TMPO_BASE_PATH = "/usr/share/tmpo/sensor/" +local TMPO_PATH_TPL = TMPO_BASE_PATH .. "%s/%s/%s/%s" -- [sid]/[rid]/[lvl]/[bid] +local TMPO_REGEX_QUERY = "^/query/(%x+)/tmpo$" +local TMPO_TOPIC_QUERY_PUB = "/sensor/%s/query/%s/%s" -- provide queried data as payload +local TMPO_TOPIC_QUERY_SUB = "/query/+/tmpo" -- get sensor to query with payload interval +local TMPO_FMT_QUERY = "time:%d sid:%s rid:%d lvl:%2d bid:%d" +local TMPO_LVLS_REVERSE = { 20, 16, 12, 8 } -- query runs from the past to now... + +-- mosquitto client params +local MOSQ_ID = DAEMON +local MOSQ_CLN_SESSION = true +local MOSQ_HOST = "localhost" +local MOSQ_PORT = 1883 +local MOSQ_KEEPALIVE = 900 +local MOSQ_TIMEOUT = 0 -- return instantly from select call +local MOSQ_MAX_PKTS = 1 -- packets +local MOSQ_QOS0 = 0 -- at most once +local MOSQ_QOS1 = 1 -- at least once +local MOSQ_QOS2 = 2 -- exactly once +local MOSQ_RETAIN = true +local MOSQ_ERROR = "MQTT error: %s" + +-- increase process niceness +nixio.nice(TMPO_NICE) + +-- connect to the MQTT broker +mosq.init() +local mqtt = mosq.new(MOSQ_ID, MOSQ_CLN_SESSION) +while not mqtt:connect(MOSQ_HOST, MOSQ_PORT, MOSQ_KEEPALIVE) do + nixio.nanosleep(SLEEP_S, SLEEP_NS) +end +-- subscribe to query topic +mqtt:subscribe(TMPO_TOPIC_QUERY_SUB, MOSQ_QOS0) + +mqtt:set_callback(mosq.ON_MESSAGE, function(mid, topic, jpayload, qos, retain) + local function sdir(path) + local files = { } + for file in nixio.fs.dir(path) or function() end do --dummy iterator + files[#files + 1] = tonumber(file) or file + end + table.sort(files) + return files + end + + local function dprint(fmt, ...) + if DEBUG.query then + print(fmt:format( + os.time(), + ...)) + end + end + + local function publish(sid, rid, lvl, bid, from, to) + dprint(TMPO_FMT_QUERY, sid, rid, lvl, bid) + local path = TMPO_PATH_TPL:format(sid, rid, lvl, bid) + local source = assert(io.open(path, "r")) + local payload = source:read("*all") + local topic = TMPO_TOPIC_QUERY_PUB:format(sid, from, to) + if DEBUG.query then + local str = string.format("publishing topic:%s payload:%s", topic, payload) + print(str) + end + -- query is published exactly once - QoS = 2 + mqtt:publish(topic, payload, MOSQ_QOS2, not MOSQ_RETAIN) + source:close() + end + + -- publish the stored files on a query request + local function query(sid) + if not sid then return end + -- payload contains query time interval [fromtimestamp, totimestamp] + local payload = luci.json.decode(jpayload) + if payload == nil then return end + local lastrid = 0 + local lastlvl = 0 + local lastbid = 0 + local published = false + local from = payload[1] + local to = payload[2] + if DEBUG.query then + local str = string.format("entered sensor:%s from:%s to:%s", sid, from, to) + print(str) + end + for rid in nixio.fs.dir(TMPO_BASE_PATH .. sid) do + for _, lvl in ipairs(TMPO_LVLS_REVERSE) do + for _, bid in ipairs(sdir(TMPO_PATH_TPL:format(sid, rid, lvl, ""))) do + -- detect store with containing or overlapping values + if ((from <= bid) and (bid <= to)) then + publish(sid, rid, lvl, bid, from, to) + published = true + end + if ((lastbid ~= 0) and (lastbid < from) and (bid > from)) then + publish(sid, rid, lastlvl, lastbid, from, to) + published = true + end + -- recognize overlaps in different compression stages + lastrid = rid + lastlvl = lvl + lastbid = bid + if DEBUG.query then + str = string.format("processed file /%s/%s/%s", rid, lvl, bid) + print(str) + end + end + end + end + -- send last stored file in case there were no further readings, e.g. on solar + if ((published == false) and (lastbid < from)) then + publish(sid, lastrid, lastlvl, lastbid, from, to) + end + return true + end + + if retain then return end + local q = query(topic:match(TMPO_REGEX_QUERY)) + return +end) + +local ufdr = uloop.fd(mqtt:socket(), uloop.READ, function(events) + mqtt:read(MOSQ_MAX_PKTS) +end) + +local ufdw = uloop.fd(mqtt:socket(), uloop.WRITE, function(events) + mqtt:write(MOSQ_MAX_PKTS) +end) + +local ub_events = { + ["flukso.sighup"] = function(msg) + -- do someting meaningful + end +} + +ub:listen(ub_events) + +local ut +ut = uloop.timer(function() + -- mosquitto connection maintenance + local success, errno, err = mqtt:misc() + if not success then + error(MOSQ_ERROR:format(err)) + end + ut:set(ULOOP_TIMEOUT_MS) + end, ULOOP_TIMEOUT_MS) + +uloop:run() From e034542ae933045f016ab10e74b46cdb5e4de81a Mon Sep 17 00:00:00 2001 From: Markus Gebhard Date: Sun, 28 Feb 2016 18:21:24 +0100 Subject: [PATCH 12/13] reduce queryd QOS to 1 --- openwrt/package/flukso/luasrc/queryd.lua | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/openwrt/package/flukso/luasrc/queryd.lua b/openwrt/package/flukso/luasrc/queryd.lua index 736789f9..a5c261f7 100644 --- a/openwrt/package/flukso/luasrc/queryd.lua +++ b/openwrt/package/flukso/luasrc/queryd.lua @@ -110,8 +110,8 @@ mqtt:set_callback(mosq.ON_MESSAGE, function(mid, topic, jpayload, qos, retain) local str = string.format("publishing topic:%s payload:%s", topic, payload) print(str) end - -- query is published exactly once - QoS = 2 - mqtt:publish(topic, payload, MOSQ_QOS2, not MOSQ_RETAIN) + -- query is published at least once - QoS = 1 + mqtt:publish(topic, payload, MOSQ_QOS1, not MOSQ_RETAIN) source:close() end From 3df2c9e6b2b5e345b74548521db45f02f17717be Mon Sep 17 00:00:00 2001 From: Markus Gebhard Date: Sun, 28 Feb 2016 18:33:19 +0100 Subject: [PATCH 13/13] leave query to own daemon --- openwrt/package/flukso/luasrc/tmpod.lua | 93 +------------------------ 1 file changed, 2 insertions(+), 91 deletions(-) diff --git a/openwrt/package/flukso/luasrc/tmpod.lua b/openwrt/package/flukso/luasrc/tmpod.lua index ac459aac..86bbce1a 100755 --- a/openwrt/package/flukso/luasrc/tmpod.lua +++ b/openwrt/package/flukso/luasrc/tmpod.lua @@ -40,8 +40,7 @@ local DEBUG = { config = false, block8 = false, compact = false, - sync = false, - query = false + sync = false } local DAEMON = os.getenv("DAEMON") or "tmpod" @@ -77,12 +76,6 @@ local TMPO_TOPIC_SYNC_PUB = "/device/%s/tmpo/sync" local TMPO_TOPIC_SENSOR_SUB = "/sensor/+/+" local TMPO_TOPIC_SENSOR_PUB = "/sensor/%s/tmpo/%d/%d/%d/gz" -- /sensor/[sid]/tmpo/[rid]/[lvl]/[bid]/gz - -local TMPO_REGEX_QUERY = "^/query/(%x+)/tmpo$" -local TMPO_TOPIC_QUERY_PUB = "/sensor/%s/query/%s/%s" -- provide queried data as payload -local TMPO_TOPIC_QUERY_SUB = "/query/+/tmpo" -- get sensor to query with payload interval -local TMPO_FMT_QUERY = "time:%d sid:%s rid:%d lvl:%2d bid:%d" - local TMPO_TOPIC_MQTT_CHECK = string.format("/daemon/%s/check", DAEMON) local TMPO_GC20_THRESHOLD = 100 -- 100 free 4kB blocks out of +-1000 in jffs2 = 90% full local TMPO_GZCHECK_EXEC_FMT = "gzip -trS '' %s 2>&1" @@ -98,7 +91,6 @@ local MOSQ_TIMEOUT = 0 -- return instantly from select call local MOSQ_MAX_PKTS = 1 -- packets local MOSQ_QOS0 = 0 local MOSQ_QOS1 = 1 -local MOSQ_QOS2 = 2 local MOSQ_RETAIN = true local MOSQ_ERROR = "MQTT error: %s" @@ -122,8 +114,6 @@ if not mqtt:connect(MOSQ_HOST, MOSQ_PORT, MOSQ_KEEPALIVE) then end merror(mqtt:subscribe(TMPO_TOPIC_SYNC_SUB:format(DEVICE), MOSQ_QOS0)) merror(mqtt:subscribe(TMPO_TOPIC_SENSOR_SUB, MOSQ_QOS0)) --- subscribe to query topic -merror(mqtt:subscribe(TMPO_TOPIC_QUERY_SUB, MOSQ_QOS0)) local config = { sensor = nil, @@ -643,37 +633,6 @@ local tmpo = { } mqtt:set_callback(mosq.ON_MESSAGE, function(mid, topic, jpayload, qos, retain) - local function sdir(path) - local files = { } - for file in nixio.fs.dir(path) or function() end do --dummy iterator - files[#files + 1] = tonumber(file) or file - end - table.sort(files) - return files - end - - local function dprint(fmt, ...) - if DEBUG.query then - print(fmt:format( - os.time(), - ...)) - end - end - - local function publish(sid, rid, lvl, bid, from, to) - dprint(TMPO_FMT_QUERY, sid, rid, lvl, bid) - local path = TMPO_PATH_TPL:format(sid, rid, lvl, bid) - local source = assert(io.open(path, "r")) - local payload = source:read("*all") - local topic = TMPO_TOPIC_QUERY_PUB:format(sid, from, to) - if DEBUG.query then - local str = string.format("publishing topic:%s payload:%s", topic, payload) - print(str) - end - merror(mqtt:publish(topic, payload, MOSQ_QOS2, not MOSQ_RETAIN)) - source:close() - end - local function sensor(sid, dtype) local sparams = config.sensor[sid] if not (sid and sparams and dtype == sparams.data_type) then return end @@ -689,57 +648,9 @@ mqtt:set_callback(mosq.ON_MESSAGE, function(mid, topic, jpayload, qos, retain) tmpo:sync1(payload) end - -- publish the stored files on a query request - local function query(sid) - if not sid then return end - -- payload contains query time interval [fromtimestamp, totimestamp] - local payload = luci.json.decode(jpayload) - if payload == nil then return end - local lastrid = 0 - local lastlvl = 0 - local lastbid = 0 - local published = false - local from = payload[1] - local to = payload[2] - if DEBUG.query then - local str = string.format("entered sensor:%s from:%s to:%s", sid, from, to) - print(str) - end - for rid in nixio.fs.dir(TMPO_BASE_PATH .. sid) do - for _, lvl in ipairs(TMPO_LVLS_REVERSE) do -- storage has to be queried from past to now - for _, bid in ipairs(sdir(TMPO_PATH_TPL:format(sid, rid, lvl, ""))) do - -- detect store with containing or overlapping values - if ((from <= bid) and (bid <= to)) then - publish(sid, rid, lvl, bid, from, to) - published = true - end - if ((lastbid ~= 0) and (lastbid < from) and (bid > from)) then - publish(sid, rid, lastlvl, lastbid, from, to) - published = true - end - -- recognize overlaps in different compression stages - lastrid = rid - lastlvl = lvl - lastbid = bid - if DEBUG.query then - str = string.format("processed file /%s/%s/%s", rid, lvl, bid) - print(str) - end - end - end - end - -- send last stored file in case there were no further readings, e.g. on solar - if ((published == false) and (lastbid < from)) then - publish(sid, lastrid, lastlvl, lastbid, from, to) - end - return true - end - if retain then return end if not sensor(topic:match(TMPO_REGEX_SENSOR)) then - if not query(topic:match(TMPO_REGEX_QUERY)) then - sync(topic:match(TMPO_REGEX_SYNC)) - end + sync(topic:match(TMPO_REGEX_SYNC)) end end)