Skip to content
Open
Changes from 1 commit
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
1e2a0b5
make 'old' flukso daemon publish configuration data
gebhardm Jan 3, 2015
955c4e0
Preparation of query on tempo data
gebhardm Jan 30, 2015
94a9214
next step in providing tmpo data on query
gebhardm Feb 14, 2015
9303375
tmpo daemaon with query callback
gebhardm Feb 15, 2015
5cac127
sync with origin develop
gebhardm Feb 15, 2015
ebacd41
sync with develop
gebhardm Feb 15, 2015
8ee66f1
return conditions/interval boundaries
gebhardm Feb 16, 2015
af1907f
overlapping intervals corrected
gebhardm Feb 22, 2015
b212a09
[tmpo] correct query order of storage
gebhardm Feb 23, 2015
2633729
[tmpo] query potential data outside interval
gebhardm Feb 26, 2015
7fd4440
Merge remote-tracking branch 'upstream/develop' into develop
gebhardm Jun 14, 2015
8b8cc23
Merge branch 'develop' into tmpoquery
gebhardm Jun 14, 2015
e9f1278
config publishing in tmpod
gebhardm Jun 27, 2015
d3d8837
upstram merge
gebhardm Jun 28, 2015
36fd227
Merge branch 'tmpoquery' of https://github.com/gebhardm/flm02 into tmp…
gebhardm Jun 28, 2015
4b5ca91
Merge remote-tracking branch 'upstream/develop' into tmpoquery
gebhardm Jun 28, 2015
e35311c
Merge branch 'develop' of https://github.com/gebhardm/flm02 into develop
gebhardm Jul 18, 2015
4259578
reset to original distribution to start with queryd
gebhardm Jul 18, 2015
0ef2cf3
install query daemon for TMPO time series
gebhardm Jul 18, 2015
74bbdfe
Merge branch 'tmpoquery' of https://github.com/gebhardm/flm02 into tm…
gebhardm Jul 18, 2015
5af5e85
Merge branch 'develop' of https://github.com/flukso/flm02 into develop
gebhardm Dec 21, 2015
c575b38
sync with original
gebhardm Dec 21, 2015
e2b0ec4
Merge branch 'tmpoquery' of https://github.com/gebhardm/flm02 into tm…
gebhardm Feb 21, 2016
e034542
reduce queryd QOS to 1
gebhardm Feb 28, 2016
3df2c9e
leave query to own daemon
gebhardm Feb 28, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
tmpo daemaon with query callback
  • Loading branch information
gebhardm committed Feb 15, 2015
commit 93033759e755aaf68c1251d937375e3c92ce80c6
69 changes: 50 additions & 19 deletions openwrt/package/flukso/luasrc/tmpod.lua
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ local DEBUG = {
config = false,
block8 = false,
compact = false,
sync = false
sync = false,
query = false
}

local DAEMON = os.getenv("DAEMON") or "tmpod"
Expand Down Expand Up @@ -70,15 +71,16 @@ local TMPO_REGEX_V1 = '^,"v":%[0(.*)$'
local TMPO_REGEX_V2 = '^(.-)%].*$'
local TMPO_REGEX_SYNC = "^/d/device/(%x+)/tmpo/sync$"
local TMPO_REGEX_SENSOR = "^/sensor/(%x+)/(%l+)$"
local TMPO_REGEX_QUERY = "^/query/(%x+)/tmpo$"
local TMPO_TOPIC_SYNC_SUB = "/d/device/%s/tmpo/sync"
local TMPO_TOPIC_SYNC_PUB = "/device/%s/tmpo/sync"
local TMPO_TOPIC_SENSOR_SUB = "/sensor/+/+"
local TMPO_TOPIC_SENSOR_PUB = "/sensor/%s/tmpo/%d/%d/%d/gz"
-- /sensor/[sid]/tmpo/[rid]/[lvl]/[bid]/gz

local TMPO_REGEX_QUERY = "^/query/(%x+)/tmpo$"
local TMPO_TOPIC_QUERY_PUB = "/sensor/%s/query" -- provide queried data as payload
local TMPO_TOPIC_QUERY_SUB = "/query/+/tmpo" -- get sensor to query with payload interval
local TMPO_FMT_QUERY = "time:%d sid:%s rid:%d lvl:%2d bid:%d"

local TMPO_GC20_THRESHOLD = 100 -- 100 free 4kB blocks out of +-1000 in jffs2 = 90% full
local TMPO_GZCHECK_EXEC_FMT = "gzip -trS '' %s 2>&1"
Expand All @@ -94,6 +96,7 @@ local MOSQ_TIMEOUT = 0 -- return instantly from select call
local MOSQ_MAX_PKTS = 1 -- packets
local MOSQ_QOS0 = 0
local MOSQ_QOS1 = 1
local MOSQ_QOS2 = 2
local MOSQ_RETAIN = true
local MOSQ_ERROR = "MQTT error: %s"

Expand Down Expand Up @@ -614,6 +617,27 @@ mqtt:set_callback(mosq.ON_MESSAGE, function(mid, topic, jpayload, qos, retain)
return files
end

local function dprint(fmt, ...)
if DEBUG.query then
print(fmt:format(
os.time(),
...))
end
end

local function publish(sid, rid, lvl, bid)
dprint(TMPO_FMT_QUERY, sid, rid, lvl, bid)
local path = TMPO_PATH_TPL:format(sid, rid, lvl, bid)
local source = assert(io.open(path, "r"))
local payload = source:read("*all")
local topic = TMPO_TOPIC_QUERY_PUB:format(sid)
if DEBUG.query then
print("publishing", topic, payload)
end
mqtt:publish(topic, payload, MOSQ_QOS2, not MOSQ_RETAIN)
source:close()
end

local function sensor(sid, dtype)
local sparams = config.sensor[sid]
if not (sid and sparams and dtype == sparams.data_type) then return end
Expand All @@ -629,23 +653,30 @@ mqtt:set_callback(mosq.ON_MESSAGE, function(mid, topic, jpayload, qos, retain)
tmpo:sync1(payload)
end

-- publish the stored files on a query request
local function query(sid)
-- payload contains query time interval {from:fromtimestamp, to:totimestamp}
local payload = luci.json.decode(jpayload)
for rid in nixio.fs.dir(TMPO_BASE_PATH .. sid) do
for _, lvl in ipairs(sdir(TMPO_PATH_TPL:format(sid, rid, "", ""))) do
for _, bid in ipairs(sdir(TMPO_PATH_TPL:format(sid, rid, lvl, ""))) do
if bid >= payload.from and bid <= payload.to
-- publish the respective file containing the requested values
-- note, the query interval may be smaller than a file's content
-- then the respective file must be sent
end
end
end
end
return true
end
-- publish the stored files on a query request
local function query(sid)
-- payload contains query time interval [fromtimestamp, totimestamp]
local payload = luci.json.decode(jpayload)
local lastbid = 0
if DEBUG.query then
print("entered query with ", sid, payload[1], payload[2])
end
for rid in nixio.fs.dir(TMPO_BASE_PATH .. sid) do
for _, lvl in ipairs(sdir(TMPO_PATH_TPL:format(sid, rid, "", ""))) do
for _, bid in ipairs(sdir(TMPO_PATH_TPL:format(sid, rid, lvl, ""))) do
-- detect store with containing or overlapping values
if ((payload[1] <= bid) and (bid <= payload[2])) then
publish(sid, rid, lvl, bid)
end
if ((lastbid ~= 0) and (bid >= payload[2]) and (lastbid <= payload[1])) then
publish(sid, rid, lvl, lastbid)
end
lastbid = bid
end
end
end
return true
end

if retain then return end
if not sensor(topic:match(TMPO_REGEX_SENSOR)) then
Expand Down