tb.c 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360
  1. #include "tb.h"
  2. #include "plt.h"
  3. struct tb_t tb;
  4. /* in seconds */
  5. static long long tb_get_unixts()
  6. {
  7. return (long long)time(NULL);
  8. }
  9. /* in ms */
  10. long long tb_get_ts()
  11. {
  12. return (tb_get_unixts() + 0 * (long long)3600) * (long long)1000;
  13. // return tb_get_unixts()*1000;
  14. }
  15. static int tb_upload_meter()
  16. {
  17. struct tb_t *c = &tb;
  18. tbmqtt_ringbuffer_element_t e;
  19. char buf[128];
  20. char itm_buf[2048];
  21. int i;
  22. if (c->meter_intv[0] > 0)
  23. {
  24. if (c->meter_timer[0]++ >= c->meter_intv[0])
  25. {
  26. c->meter_timer[0] = 0;
  27. e.cmd = CMD_MQTT_SENDKV;
  28. e.sztopic[0] = 0;
  29. e.szpayload[0] = 0;
  30. /* sw version */
  31. log_dbg("%s, data : %s", __func__, itm_buf);
  32. sprintf(e.szpayload, "{'ts':%lld,'values':{%s}}", tb_get_ts(), itm_buf); /* thingsboard */
  33. // sprintf(e.sztopic,"ems/data/rtEss/%s", sta_get_sn());
  34. strcpy(e.sztopic, "v1/devices/me/telemetry");
  35. tbmqtt_lock_txbuf();
  36. tbmqtt_queue_txbuf(e);
  37. tbmqtt_unlock_txbuf();
  38. }
  39. }
  40. return 0;
  41. }
  42. static int tb_send_rpc_response(char *request_topic, char *response_payload)
  43. {
  44. char *p = NULL;
  45. char szreqid[256];
  46. char *pszreqid = szreqid;
  47. int reqid = 0;
  48. tbmqtt_ringbuffer_element_t e;
  49. p = strrchr(request_topic, '/');
  50. if (p != NULL)
  51. {
  52. p++;
  53. while (*p != 0)
  54. {
  55. *pszreqid++ = *p++;
  56. }
  57. *pszreqid = 0;
  58. reqid = atoi(szreqid);
  59. e.cmd = CMD_MQTT_SENDKV;
  60. e.sztopic[0] = 0;
  61. e.szpayload[0] = 0;
  62. strcpy(e.szpayload, response_payload);
  63. sprintf(e.sztopic, "v1/devices/me/rpc/response/%s", szreqid);
  64. log_dbg("%s, topic:%s, payload:%s", __func__, e.sztopic, e.szpayload);
  65. tbmqtt_lock_txbuf();
  66. tbmqtt_queue_txbuf(e);
  67. tbmqtt_unlock_txbuf();
  68. }
  69. return 0;
  70. }
  71. static int tb_update_attributes(const char *payload)
  72. {
  73. tbmqtt_ringbuffer_element_t e;
  74. e.cmd = CMD_MQTT_SENDKV;
  75. e.sztopic[0] = 0;
  76. e.szpayload[0] = 0;
  77. sprintf(e.szpayload, "{%s}", payload);
  78. strcpy(e.sztopic, "v1/devices/me/attributes");
  79. log_dbg("%s, topic:%s, payload:%s", __func__, e.sztopic, e.szpayload);
  80. tbmqtt_lock_txbuf();
  81. tbmqtt_queue_txbuf(e);
  82. tbmqtt_unlock_txbuf();
  83. return 0;
  84. }
  85. static void tb_upload_param()
  86. {
  87. struct tb_t *dev = &tb;
  88. tbmqtt_ringbuffer_element_t e;
  89. char buf[128];
  90. char itm_buf[2048];
  91. int i;
  92. /* 5 seconds period */
  93. if (dev->param_en > 0)
  94. {
  95. dev->param_en = 0;
  96. e.cmd = CMD_MQTT_SENDKV;
  97. e.sztopic[0] = 0;
  98. e.szpayload[0] = 0;
  99. /* sw version */
  100. sprintf(e.szpayload, "{'ts':%lld,'values':{%s}}", tb_get_ts(), itm_buf);
  101. strcpy(e.sztopic, "v1/devices/me/telemetry");
  102. tbmqtt_lock_txbuf();
  103. tbmqtt_queue_txbuf(e);
  104. tbmqtt_unlock_txbuf();
  105. }
  106. }
  107. static void tb_upload_sys()
  108. {
  109. struct tb_t *c = &tb;
  110. tbmqtt_ringbuffer_element_t e;
  111. char buf[128];
  112. char itm_buf[2048];
  113. int i;
  114. /* 10 seconds period */
  115. if (c->sys_intv[0] > 0)
  116. {
  117. if (c->sys_timer[0]++ >= c->sys_intv[0])
  118. {
  119. c->sys_timer[0] = 0;
  120. e.cmd = CMD_MQTT_SENDKV;
  121. e.sztopic[0] = 0;
  122. e.szpayload[0] = 0;
  123. // sys state and err
  124. itm_buf[0] = 0;
  125. sprintf(e.szpayload, "{'ts':%lld,'values':{%s}}", tb_get_ts(), itm_buf);
  126. strcpy(e.sztopic, "v1/devices/me/telemetry");
  127. tbmqtt_lock_txbuf();
  128. tbmqtt_queue_txbuf(e);
  129. tbmqtt_unlock_txbuf();
  130. }
  131. }
  132. /* 60 seconds period */
  133. if (c->sys_intv[1] > 0)
  134. {
  135. if (c->sys_timer[1]++ >= c->sys_intv[1])
  136. {
  137. c->sys_timer[1] = 0;
  138. e.cmd = CMD_MQTT_SENDKV;
  139. e.sztopic[0] = 0;
  140. e.szpayload[0] = 0;
  141. itm_buf[0] = 0;
  142. sprintf(e.szpayload, "{'ts':%lld,'values':{%s}}", tb_get_ts(), itm_buf);
  143. strcpy(e.sztopic, "v1/devices/me/telemetry");
  144. tbmqtt_lock_txbuf();
  145. tbmqtt_queue_txbuf(e);
  146. tbmqtt_unlock_txbuf();
  147. }
  148. }
  149. }
  150. #if 0
  151. static void tb_upload_meter( void )
  152. {
  153. struct tb_t* dev = &tb;
  154. tbmqtt_ringbuffer_element_t e;
  155. char buf[8192];
  156. char itm_buf[8192];
  157. int i;
  158. /* 60 seconds */
  159. if( dev->meter_intv[0] > 0){
  160. if( dev->meter_timer[0]++ >= dev->meter_intv[0] ){
  161. dev->meter_timer[0] = 0;
  162. // dtsd1352
  163. e.cmd = CMD_MQTT_SENDKV;
  164. e.sztopic[0] = 0;
  165. e.szpayload[0] = 0;
  166. memset(itm_buf,0,sizeof(itm_buf));
  167. meter_get_tbmqtt_data(itm_buf);
  168. sprintf(e.szpayload,"{'ts':%lld,'values':{%s}}", tb_get_ts(), itm_buf);
  169. //printf("%s,%s\n",__func__,e.szpayload);
  170. strcpy(e.sztopic,"v1/devices/me/telemetry");
  171. tbmqtt_lock_txbuf();
  172. tbmqtt_queue_txbuf(e);
  173. tbmqtt_unlock_txbuf();
  174. }
  175. }
  176. }
  177. #endif
  178. int tb_init(void)
  179. {
  180. log_dbg("%s, ++,", __func__);
  181. struct tb_t *dev = &tb;
  182. int ret = 0;
  183. dev->upload_enable = 1;
  184. dev->param_en = 0;
  185. dev->sys_intv[0] = 10;
  186. dev->sys_intv[1] = 60;
  187. dev->meter_intv[0] = 5;
  188. dev->tb_lock_timer = 0;
  189. dev->tb_lock = 1;
  190. dev->tb_lock_intv = 15;
  191. log_dbg("%s, --, ret:%d", __func__, ret);
  192. return ret;
  193. }
  194. static int tmpaps_received = 0;
  195. static int tmpaps;
  196. static void tb_proc_recv()
  197. {
  198. struct tb_t *dev = &tb;
  199. static int init = 0;
  200. if(init == 0)
  201. {
  202. init = 1;
  203. tb_update_attributes("'TbLock':true");
  204. }
  205. if ((dev->tb_lock == 0) && (dev->tb_lock_timer++ >= dev->tb_lock_intv))
  206. {
  207. dev->tb_lock_timer = 0;
  208. dev->tb_lock = 1;
  209. tb_update_attributes("'TbLock':true");
  210. }
  211. cJSON *root = NULL;
  212. cJSON *item = NULL;
  213. int aps = 0;
  214. int rc;
  215. tbmqtt_ringbuffer_element_t e;
  216. tbmqtt_lock_rxbuf();
  217. rc = tbmqtt_dequeue_rxbuf(&e);
  218. tbmqtt_unlock_rxbuf();
  219. if (rc == 1)
  220. {
  221. log_dbg("%s, get topic:%s", __func__, e.sztopic);
  222. log_dbg("%s, and payload:%s", __func__, e.szpayload);
  223. root = cJSON_Parse(e.szpayload);
  224. if (!root)
  225. {
  226. log_dbg("%s, cJSON_Parse null", __func__);
  227. }
  228. else
  229. {
  230. item = cJSON_GetObjectItem(root, "method");
  231. if (!item)
  232. {
  233. log_dbg("%s, cJSON_GetObjectItem method fail", __func__);
  234. }
  235. else
  236. {
  237. log_dbg("%s, get method:%s", __func__, item->valuestring);
  238. if (strcmp(item->valuestring, "set_lock") == 0)
  239. {
  240. item = cJSON_GetObjectItem(root, "params");
  241. if (!item)
  242. {
  243. log_dbg("%s, set_lock failed to get params ", __func__);
  244. }
  245. else
  246. {
  247. log_dbg("%s, set_lock, value :%s", __func__, item->valuestring);
  248. if (strcmp(item->valuestring, "lock") == 0)
  249. {
  250. dev->tb_lock = 1;
  251. }
  252. else
  253. {
  254. dev->tb_lock = 0;
  255. }
  256. }
  257. }
  258. else
  259. {
  260. if (dev->tb_lock)
  261. {
  262. log_dbg("%s, thingsboard is locked, do nothing", __func__);
  263. }
  264. else
  265. {
  266. if (strcmp(item->valuestring, "stdby") == 0)
  267. {
  268. }
  269. else if (strcmp(item->valuestring, "set_aps") == 0)
  270. {
  271. item = cJSON_GetObjectItem(root, "params");
  272. if (!item)
  273. {
  274. log_dbg("%s, cJSON_GetObjectItem params fail", __func__);
  275. }
  276. else
  277. {
  278. // ess_set_aps(item->valueint);
  279. log_dbg("%s, aps:%d", __func__, item->valueint);
  280. tmpaps = item->valueint;
  281. tmpaps_received = 1;
  282. }
  283. }
  284. else
  285. {
  286. log_dbg("%s, unknown cmd : %s", __func__, item->valuestring);
  287. }
  288. }
  289. }
  290. }
  291. cJSON_Delete(root);
  292. }
  293. }
  294. }
  295. void tb_exe(void)
  296. {
  297. tb_proc_recv();
  298. if (tb.upload_enable)
  299. {
  300. tb_upload_param();
  301. tb_upload_sys();
  302. tb_upload_meter();
  303. }
  304. }
  305. int tb_set_param_en(int val)
  306. {
  307. struct tb_t *dev = &tb;
  308. dev->param_en = val;
  309. }