tbmqtt.c 18 KB


  1. /*******************************************************************************
  2. * Copyright (c) 2012, 2020 IBM Corp.
  3. *
  4. * All rights reserved. This program and the accompanying materials
  5. * are made available under the terms of the Eclipse Public License v2.0
  6. * and Eclipse Distribution License v1.0 which accompany this distribution.
  7. *
  8. * The Eclipse Public License is available at
  9. * https://www.eclipse.org/legal/epl-2.0/
  10. * and the Eclipse Distribution License is available at
  11. * http://www.eclipse.org/org/documents/edl-v10.php.
  12. *
  13. * Contributors:
  14. * Ian Craggs - initial contribution
  15. * Ian Craggs - add full capability
  16. *******************************************************************************/
  17. #include "plt.h"
  18. #include <signal.h>
  19. #include <stdio.h>
  20. #include <stdlib.h>
  21. #include <string.h>
  22. #include <sys/time.h>
  23. #include "tbmqtt_cache.h"
  24. #define TBMQTT_CACHE_DATA_BASE "../data/tbmqtt.db"
  25. #define TBMQTT_CACHE_TABLE "tbmqtt_cache"
  26. #define TX_BUFF_TO_CACHE_USAGE (75.0)
  27. #define CACHE_TO_TX_BUFF_USAGE (65.0)
  28. #define CACHE_FILE_MEMORY_SIZE_MAX (40 * 1024 * 1024)
  29. static tbmqtt_ringbuffer_t tbmqtt_txbuf;
  30. static pthread_mutex_t tbmqtt_txbuf_mutex;
  31. static tbmqtt_ringbuffer_t tbmqtt_rxbuf;
  32. static pthread_mutex_t tbmqtt_rxbuf_mutex;
  33. static void *tbmqtt_cache_handle = NULL;
  34. struct tbmqtt_t tbmqtt;
  35. double tbmqtt_get_timeofday()
  36. {
  37. struct timeval tv;
  38. struct timezone tz;
  39. gettimeofday(&tv, &tz);
  40. return (double)tv.tv_sec * 1000 + (double)tv.tv_usec / 1000;
  41. }
  42. static void tbmqtt_connlost(void *context, char *cause)
  43. {
  44. struct tbmqtt_t *dev = &tbmqtt;
  45. log_dbg("%s, mqtt connection lost, cause: %s\n", __func__, cause);
  46. dev->connlost = 1;
  47. dev->connLostCnt++;
  48. /*
  49. tbmqtt_lock_txbuf();
  50. MQTTClient_destroy(&mqtt->cli);
  51. mqtt->reconncnt++;
  52. if(tbmqtt_connect()!=0){
  53. mqtt->connect = 0;
  54. }else{
  55. mqtt->connect = 1;
  56. }
  57. tbmqtt_unlock_txbuf();
  58. */
  59. }
  60. static int tbmqtt_msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
  61. {
  62. struct tbmqtt_t *dev = &tbmqtt;
  63. struct statemachine_t *sm = &dev->sm;
  64. int rc = 0;
  65. tbmqtt_ringbuffer_element_t e;
  66. char *pdst = NULL;
  67. char *psrc = NULL;
  68. int i;
  69. // log_dbg("%s, Message arrived, topic:%s topic len:%d payload len:%d", __func__, topicName,topicLen, message->payloadlen);
  70. if (message)
  71. {
  72. strcpy(e.sztopic, topicName);
  73. strncpy(e.szpayload, message->payload, message->payloadlen);
  74. if (dev->dbg)
  75. {
  76. log_dbg("%s, Message arrived, topic:%s topic len:%d payload len:%d", __func__, topicName, topicLen, message->payloadlen);
  77. log_dbg("%s, payload:%s", __func__, e.szpayload);
  78. }
  79. MQTTClient_freeMessage(&message);
  80. MQTTClient_free(topicName);
  81. tbmqtt_lock_rxbuf();
  82. tbmqtt_queue_rxbuf(e);
  83. tbmqtt_unlock_rxbuf();
  84. }
  85. }
  86. // static int tbmqtt_msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
  87. // {
  88. // int i;
  89. // char* topicptr;
  90. // char topicbuf[1024];
  91. // char plbuf[1024];
  92. // char* payloadptr;
  93. // char szreqid[256];
  94. // char* pszreqid = szreqid;
  95. // char* p = NULL;
  96. // int reqid = 0;
  97. // tbmqtt_ringbuffer_element_t e;
  98. // /*
  99. // p = plbuf;
  100. // payloadptr = message->payload;
  101. // for( i = 0; i < message->payloadlen; i++ ){
  102. // *p++ = *payloadptr++;
  103. // }
  104. // *p = 0;
  105. // p = topicbuf;
  106. // topicptr = topicName;
  107. // for( i = 0; i < topicLen; i++ ){
  108. // *p++ = *topicptr++;
  109. // }
  110. // *p = 0;
  111. // */
  112. // log_dbg("%s, Message arrived, topic:%s topic len:%d payload len:%d",
  113. // __func__, topicbuf,topicLen, message->payloadlen);
  114. // #if 0
  115. // if( topicLen > 0 ){
  116. // p = strrchr(topicName, '/');
  117. // if( p != NULL ){
  118. // p++;
  119. // while( *p != 0){
  120. // *pszreqid++ = *p++;
  121. // }
  122. // *pszreqid = 0;
  123. // reqid = atoi(szreqid);
  124. // log_dbg("%s, requestid:%s|%d", __func__, szreqid, reqid);
  125. // e.cmd = CMD_MQTT_SENDKV;
  126. // e.sztopic[0] = 0;
  127. // e.szpayload[0] = 0;
  128. // strcpy(e.szpayload, plbuf);
  129. // sprintf(e.sztopic,"v1/devices/me/rpc/response/%s",szreqid);
  130. // tbmqtt_lock_txbuf();
  131. // tbmqtt_ringbuffer_queue(tbmqtt_rb, e);
  132. // tbmqtt_unlock_txbuf();
  133. // }
  134. // }
  135. // #endif
  136. // MQTTClient_freeMessage(&message);
  137. // MQTTClient_free(topicName);
  138. // return 0;
  139. // }
  140. static void tbmqtt_delivered(void *context, MQTTClient_deliveryToken dt)
  141. {
  142. struct tbmqtt_t *dev = &tbmqtt;
  143. if (dev->dbg)
  144. {
  145. log_dbg("%s, Message with token value %d delivery confirmed", __func__, dt);
  146. }
  147. }
  148. int tbmqtt_connect(void)
  149. {
  150. int ret = 0;
  151. int rc;
  152. char buf[1024];
  153. struct tbmqtt_t *mqtt = &tbmqtt;
  154. int qos = 2;
  155. MQTTClient_connectOptions tmpconn_opts = MQTTClient_connectOptions_initializer;
  156. mqtt->conn_opts = tmpconn_opts;
  157. sprintf(buf, "tcp://%s:%d", mqtt->szservip, mqtt->servport);
  158. if ((rc = MQTTClient_create(&mqtt->cli, buf, mqtt->szclientid, MQTTCLIENT_PERSISTENCE_NONE, NULL)) != MQTTCLIENT_SUCCESS)
  159. {
  160. log_dbg("%s, MQTTClient_create fail:%s", __func__, MQTTClient_strerror(rc));
  161. ret = -1;
  162. goto leave;
  163. }
  164. mqtt->conn_opts.keepAliveInterval = 100;
  165. mqtt->conn_opts.cleansession = 1;
  166. mqtt->conn_opts.username = mqtt->szaccesstoken;
  167. // mqtt->conn_opts.username = mqtt->szusername;
  168. // mqtt->conn_opts.password = mqtt->szpasswd;
  169. MQTTClient_setCallbacks(mqtt->cli, NULL, tbmqtt_connlost, tbmqtt_msgarrvd, NULL);
  170. if ((rc = MQTTClient_connect(mqtt->cli, &mqtt->conn_opts)) != MQTTCLIENT_SUCCESS)
  171. {
  172. log_dbg("%s, MQTTClient_connect fail:%s", __func__, MQTTClient_strerror(rc));
  173. ret = -1;
  174. goto leave;
  175. }
  176. // sprintf(buf, "ems/data/EssCmd/%s", sta_get_sn());
  177. rc = MQTTClient_subscribe(mqtt->cli, "v1/devices/me/rpc/request/+", qos);
  178. if (rc != MQTTCLIENT_SUCCESS && rc != qos)
  179. {
  180. log_dbg("%s, MQTTClient_subscribe fail:%d", __func__, rc);
  181. ret = -2;
  182. goto leave;
  183. }
  184. leave:
  185. return ret;
  186. }
  187. int tbmqtt_get_state()
  188. {
  189. struct tbmqtt_t *dev = &tbmqtt;
  190. return dev->sm.state;
  191. }
  192. /* tx ringbuffer handle */
  193. void tbmqtt_init_txbuf()
  194. {
  195. tbmqtt_ringbuffer_init(&tbmqtt_txbuf);
  196. }
  197. void tbmqtt_lock_txbuf()
  198. {
  199. pthread_mutex_lock(&tbmqtt_txbuf_mutex);
  200. }
  201. void tbmqtt_unlock_txbuf()
  202. {
  203. pthread_mutex_unlock(&tbmqtt_txbuf_mutex);
  204. }
  205. void tbmqtt_queue_txbuf(tbmqtt_ringbuffer_element_t data)
  206. {
  207. tbmqtt_ringbuffer_queue(&tbmqtt_txbuf, data);
  208. }
  209. int tbmqtt_dequeue_txbuf(tbmqtt_ringbuffer_element_t *data)
  210. {
  211. return tbmqtt_ringbuffer_dequeue(&tbmqtt_txbuf, data);
  212. }
  213. int tbmqtt_peek_txbuf(tbmqtt_ringbuffer_element_t *data, tbmqtt_ringbuffer_size_t index)
  214. {
  215. return tbmqtt_ringbuffer_peek(&tbmqtt_txbuf, data, 0);
  216. }
  217. /* rx ringbuffer handle */
  218. void tbmqtt_init_rxbuf()
  219. {
  220. tbmqtt_ringbuffer_init(&tbmqtt_rxbuf);
  221. }
  222. void tbmqtt_lock_rxbuf()
  223. {
  224. pthread_mutex_lock(&tbmqtt_rxbuf_mutex);
  225. }
  226. void tbmqtt_unlock_rxbuf()
  227. {
  228. pthread_mutex_unlock(&tbmqtt_rxbuf_mutex);
  229. }
  230. void tbmqtt_queue_rxbuf(tbmqtt_ringbuffer_element_t data)
  231. {
  232. tbmqtt_ringbuffer_queue(&tbmqtt_rxbuf, data);
  233. }
  234. int tbmqtt_dequeue_rxbuf(tbmqtt_ringbuffer_element_t *data)
  235. {
  236. return tbmqtt_ringbuffer_dequeue(&tbmqtt_rxbuf, data);
  237. }
  238. int tbmqtt_peek_rxbuf(tbmqtt_ringbuffer_element_t *data, tbmqtt_ringbuffer_size_t index)
  239. {
  240. return tbmqtt_ringbuffer_peek(&tbmqtt_rxbuf, data, 0);
  241. }
  242. int tbmqtt_pub(char *sztopic, char *szpayload)
  243. {
  244. double pub_time;
  245. int ret = 0;
  246. int rc;
  247. struct tbmqtt_t *dev = &tbmqtt;
  248. if (dev->dbg)
  249. {
  250. log_dbg("%s Message with delivery token:%d, topic:%s, payload:%s\n",
  251. __func__, dev->token, sztopic, szpayload);
  252. }
  253. MQTTClient_message msg = MQTTClient_message_initializer;
  254. dev->pub_starttime = tbmqtt_get_timeofday();
  255. msg.payload = szpayload;
  256. msg.payloadlen = (int)strlen(szpayload);
  257. msg.qos = 1;
  258. msg.retained = 0;
  259. #ifdef DEBUG_MQTT
  260. log_dbg("Publish");
  261. #endif
  262. rc = MQTTClient_publishMessage(dev->cli, sztopic, &msg, &dev->token);
  263. if (rc != MQTTCLIENT_SUCCESS)
  264. {
  265. log_dbg("%s, MQTTClient_publisMessage fail:%s", __func__, MQTTClient_strerror(rc));
  266. dev->pub_failed++;
  267. ret = -1;
  268. goto leave;
  269. }
  270. // printf("Waiting for up to %d seconds for publication of %s\n"
  271. // "on topic %s for client with ClientID: %s\n",
  272. // (int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);
  273. #ifdef DEBUG_MQTT
  274. log_dbg("Wait for completion");
  275. #endif
  276. rc = MQTTClient_waitForCompletion(dev->cli, dev->token, 100000L);
  277. if (rc != MQTTCLIENT_SUCCESS)
  278. {
  279. log_dbg("%s, MQTTClient_waitForCompletion fail:%s", __func__, MQTTClient_strerror(rc));
  280. dev->pub_failed++;
  281. ret = -2;
  282. goto leave;
  283. }
  284. else
  285. {
  286. #ifdef DEBUG_MQTT
  287. log_dbg("Done");
  288. #endif
  289. dev->pub_endtime = tbmqtt_get_timeofday();
  290. pub_time = dev->pub_endtime - dev->pub_starttime;
  291. dev->pubTotalCnt += 1;
  292. dev->pubTotalTime += pub_time;
  293. dev->pubAvg = dev->pubTotalTime / dev->pubTotalCnt;
  294. if (pub_time > dev->pubMax)
  295. {
  296. dev->pubMax = pub_time;
  297. }
  298. if (dev->pubTotalCnt > 1000000)
  299. {
  300. dev->pubTotalCnt = 0.0;
  301. dev->pubTotalTime = 0.0;
  302. dev->pubAvg = 0.0;
  303. dev->pubMax = 0.0;
  304. }
  305. if (dev->pub_failed > dev->pub_maxFailcnt)
  306. {
  307. dev->pub_maxFailcnt = dev->pub_failed;
  308. }
  309. dev->pub_totalFailcnt += dev->pub_failed;
  310. dev->pub_failed = 0;
  311. }
  312. leave:
  313. if (dev->dbg && ret != 0)
  314. {
  315. log_dbg("%s, ret:%d", __func__, ret);
  316. }
  317. return ret;
  318. }
  319. int tbmqtt_send_sm_cmd(int cmd)
  320. {
  321. tbmqtt.cmd = cmd;
  322. log_dbg("%s, cmd:%d", __func__, cmd);
  323. }
  324. static void tbmqtt_deal_with_cache(void)
  325. {
  326. struct tbmqtt_t *dev = &tbmqtt;
  327. double *txbuf_usage = &dev->txbuf_usage;
  328. tbmqtt_ringbuffer_element_t e;
  329. int rc = 0;
  330. int cache_item_nb = 0;
  331. int cache_item_idx = 0;
  332. static int count = 0;
  333. if (tbmqtt_cache_handle == NULL)
  334. {
  335. return;
  336. }
  337. if (*txbuf_usage > TX_BUFF_TO_CACHE_USAGE)
  338. {
  339. // get file memory size
  340. long cache_memroy_size = 0;
  341. tbmqtt_cache_get_memory_size(tbmqtt_cache_handle, &cache_memroy_size);
  342. if (cache_memroy_size > CACHE_FILE_MEMORY_SIZE_MAX)
  343. {
  344. return;
  345. }
  346. tbmqtt_lock_txbuf();
  347. rc = tbmqtt_peek_txbuf(&e, 0);
  348. tbmqtt_unlock_txbuf();
  349. if (rc == 1)
  350. {
  351. // write to cache
  352. if (tbmqtt_cache_write_one_payload(tbmqtt_cache_handle, TBMQTT_CACHE_TABLE, e) == 0)
  353. { // if success,delet from buf
  354. tbmqtt_lock_txbuf();
  355. tbmqtt_dequeue_txbuf(&e);
  356. tbmqtt_unlock_txbuf();
  357. }
  358. }
  359. }
  360. else if (*txbuf_usage < CACHE_TO_TX_BUFF_USAGE)
  361. {
  362. rc = tbmqtt_cache_get_payload_nb(tbmqtt_cache_handle, TBMQTT_CACHE_TABLE, &cache_item_nb);
  363. if (cache_item_nb > 0 && rc == 0)
  364. { // have payload in cache
  365. // read one payload
  366. e.sztopic[0] = 0;
  367. e.szpayload[0] = 0;
  368. tbmqtt_cache_read_one_payload(tbmqtt_cache_handle, TBMQTT_CACHE_TABLE, &e, &cache_item_idx);
  369. e.cmd = CMD_MQTT_SENDKV;
  370. // add to txbuf
  371. tbmqtt_lock_txbuf();
  372. tbmqtt_queue_txbuf(e);
  373. tbmqtt_unlock_txbuf();
  374. // delete from cache
  375. tbmqtt_cache_delete_one_payload(tbmqtt_cache_handle, TBMQTT_CACHE_TABLE, cache_item_idx);
  376. count++;
  377. // free memory
  378. if (count > 500)
  379. {
  380. count = 0;
  381. tbmqtt_cache_free_memory(tbmqtt_cache_handle);
  382. }
  383. } // else have no data in cache
  384. }
  385. else
  386. {
  387. // do nothing
  388. }
  389. }
  390. static void tbmqtt_db_cache_thrd_main(void *param)
  391. {
  392. log_dbg("%s, ++", __func__);
  393. // open or cread cache table
  394. tbmqtt_cache_open(TBMQTT_CACHE_DATA_BASE, TBMQTT_CACHE_TABLE, &tbmqtt_cache_handle);
  395. while (1)
  396. {
  397. tbmqtt_deal_with_cache();
  398. usleep(10000); /* 10ms */
  399. }
  400. log_dbg("%s, --", __func__);
  401. }
  402. static void *tbmqtt_thrd_main(void *param)
  403. {
  404. struct tbmqtt_t *dev = &tbmqtt;
  405. struct statemachine_t *sm = &dev->sm;
  406. pthread_t xthrd;
  407. log_dbg("%s, ++", __func__);
  408. /* reset pub timings */
  409. dev->connLostCnt = 0;
  410. dev->pubTotalCnt = 0.0;
  411. dev->pubTotalTime = 0.0;
  412. dev->pubAvg = 0.0;
  413. dev->pubMax = 0.0;
  414. tbmqtt_init_txbuf();
  415. tbmqtt_init_rxbuf();
  416. pthread_mutex_init(&tbmqtt_txbuf_mutex, NULL);
  417. pthread_mutex_init(&tbmqtt_rxbuf_mutex, NULL);
  418. // open or cread cache table
  419. // if(pthread_create(&xthrd,NULL, tbmqtt_db_cache_thrd_main, NULL)!=0){
  420. // log_dbg( "%s, create tbmqtt_db_cache_thrd_main fail", __func__);
  421. // }
  422. tbmqtt_sm_init(sm);
  423. while (1)
  424. {
  425. tbmqtt_sm();
  426. // tbmqtt_deal_with_cache();
  427. usleep(10000); /* 10ms */
  428. }
  429. log_dbg("%s, --", __func__);
  430. return NULL;
  431. }
  432. int tbmqtt_set_dbg(int val)
  433. {
  434. struct tbmqtt_t *dev = &tbmqtt;
  435. dev->dbg = val;
  436. return 0;
  437. }
  438. static int tbmqtt_dbcb_0(void *para, int ncolumn, char **columnvalue, char *columnname[])
  439. {
  440. int i;
  441. struct dbcbparam_t *pcbparam = (struct dbcbparam_t *)para;
  442. struct tbmqtt_t *dev = &tbmqtt;
  443. pcbparam->nrow++;
  444. log_dbg("%s, ++,row:%d, col:%d", __func__, pcbparam->nrow, ncolumn);
  445. if (pcbparam->nrow > 1)
  446. {
  447. log_dbg("%s, tbmqtt cfg rows is more than 1!", __func__);
  448. return -1;
  449. }
  450. for (i = 0; i < ncolumn; i++)
  451. {
  452. if (strcmp("enable", columnname[i]) == 0)
  453. {
  454. dev->enable = atoi(columnvalue[i]);
  455. }
  456. else if (strcmp("servip", columnname[i]) == 0)
  457. {
  458. strcpy(dev->szservip, columnvalue[i]);
  459. }
  460. else if (strcmp("servport", columnname[i]) == 0)
  461. {
  462. dev->servport = atoi(columnvalue[i]);
  463. }
  464. else if (strcmp("clientid", columnname[i]) == 0)
  465. {
  466. strcpy(dev->szclientid, columnvalue[i]);
  467. }
  468. else if (strcmp("username", columnname[i]) == 0)
  469. {
  470. strcpy(dev->szusername, columnvalue[i]);
  471. }
  472. else if (strcmp("passwd", columnname[i]) == 0)
  473. {
  474. strcpy(dev->szpasswd, columnvalue[i]);
  475. }
  476. else if (strcmp("accesstoken", columnname[i]) == 0)
  477. {
  478. strcpy(dev->szaccesstoken, columnvalue[i]);
  479. }
  480. else if (strcmp("timezone", columnname[i]) == 0)
  481. {
  482. dev->timezone = atoi(columnvalue[i]);
  483. }
  484. }
  485. pcbparam->ret = 0;
  486. log_dbg("%s, --,ret:%d", __func__, pcbparam->ret);
  487. return 0;
  488. }
  489. int tbmqtt_reset()
  490. {
  491. tbmqtt_lock_txbuf();
  492. tbmqtt_init_rxbuf();
  493. tbmqtt_init_txbuf();
  494. tbmqtt_unlock_txbuf();
  495. }
  496. int tbmqtt_get_cmd()
  497. {
  498. return tbmqtt.cmd;
  499. }
  500. void tbmqtt_reset_cmd()
  501. {
  502. tbmqtt.cmd = CMD_SM_DONE;
  503. }
  504. int tbmqtt_init()
  505. {
  506. pthread_t xthrd;
  507. int rc = 0;
  508. int ret = 0;
  509. struct tbmqtt_t *dev = &tbmqtt;
  510. char *errmsg = NULL;
  511. char sql[1024];
  512. struct dbcbparam_t cbparam;
  513. sqlite3 *db = NULL;
  514. log_dbg("%s, ++", __func__);
  515. plt_lock_projdb();
  516. db = plt_get_projdb();
  517. sprintf(sql, "select * from tbmqtt");
  518. cbparam.nrow = 0;
  519. rc = sqlite3_exec(db, sql, tbmqtt_dbcb_0, (void *)&cbparam, &errmsg);
  520. plt_unlock_projdb();
  521. if (rc != SQLITE_OK)
  522. {
  523. ret = -1;
  524. }
  525. else if (cbparam.ret != 0)
  526. {
  527. ret = -2;
  528. }
  529. else
  530. {
  531. if (pthread_create(&xthrd, NULL, tbmqtt_thrd_main, NULL) != 0)
  532. {
  533. log_dbg("%s, create tbmqtt_thrd_sm fail", __func__);
  534. ret = -1;
  535. }
  536. }
  537. log_dbg("%s--, ret:%d", __func__, ret);
  538. return ret;
  539. }
  540. int tbmqtt_get_txbuf_used(void)
  541. {
  542. return tbmqtt_ringbuffer_num_items(&tbmqtt_txbuf);
  543. }
  544. int tbmqtt_get_txbuf_size(void)
  545. {
  546. return tbmqtt_ringbuffer_size(&tbmqtt_txbuf);
  547. }
  548. int tbmqtt_get_rxbuf_used(void)
  549. {
  550. return tbmqtt_ringbuffer_num_items(&tbmqtt_rxbuf);
  551. }
  552. int tbmqtt_get_rxbuf_size(void)
  553. {
  554. return tbmqtt_ringbuffer_size(&tbmqtt_rxbuf);
  555. }
  556. int tbmqtt_get_tz()
  557. {
  558. return tbmqtt.timezone;
  559. }
  560. char *tbmqtt_get_state_str(void)
  561. {
  562. return tbmqtt.sm.szState;
  563. }
  564. int tbmqtt_get_stp(void)
  565. {
  566. return tbmqtt.sm.step;
  567. }
  568. char *tbmqtt_get_err_str(void)
  569. {
  570. return tbmqtt.sm.szerr;
  571. }
  572. int tbmqtt_get_tick(void)
  573. {
  574. return tbmqtt.sm.tick;
  575. }
  576. double tbmqtt_get_timing_ave(void)
  577. {
  578. return tbmqtt.sm.timing_ave;
  579. }
  580. double tbmqtt_get_timing_cur(void)
  581. {
  582. return tbmqtt.sm.timing_cur;
  583. }
  584. double tbmqtt_get_timing_max(void)
  585. {
  586. return tbmqtt.sm.timing_max;
  587. }
  588. int tbmqtt_get_enable(void)
  589. {
  590. return tbmqtt.enable;
  591. }
  592. char *tbmqtt_get_servip_str(void)
  593. {
  594. return tbmqtt.szservip;
  595. }
  596. int tbmqtt_get_servport(void)
  597. {
  598. return tbmqtt.servport;
  599. }
  600. char *tbmqtt_get_client_id(void)
  601. {
  602. return tbmqtt.szclientid;
  603. }
  604. double tbmqtt_get_txbuf_usage(void)
  605. {
  606. return tbmqtt.txbuf_usage;
  607. }
  608. char *tbmqtt_get_access_token(void)
  609. {
  610. return tbmqtt.szaccesstoken;
  611. }
  612. int tbmqtt_get_tool_data(char *buf)
  613. {
  614. struct tbmqtt_t *dev = &tbmqtt;
  615. struct statemachine_t *sm = &dev->sm;
  616. char buf_temp[8192];
  617. sprintf(buf, " TBMQTT ");
  618. sm_get_summary(sm, buf_temp, sizeof(buf_temp));
  619. strcat(buf, buf_temp);
  620. sprintf(buf_temp, " en:%d tz:%d servip:%s servport:%d clientid:%s accesstoken:%s txbufusage:%.1f rxbufusage:%.1f lost count:%d \n",
  621. dev->enable, dev->timezone, dev->szservip, dev->servport, dev->szclientid, dev->szaccesstoken, dev->txbuf_usage, dev->rxbuf_usage, dev->connLostCnt);
  622. strcat(buf, buf_temp);
  623. sprintf(buf_temp, " dbg:%d", dev->dbg);
  624. strcat(buf, buf_temp);
  625. return 0;
  626. }