mqtt_sm.c 6.5 KB


  1. #include "plt.h"
  2. #include "mqtt_sm.h"
  3. static struct state_t mqtt_states[] = {
  4. {SMST_OFFLINE, "offline"},
  5. {SMST_READY, "ready"},
  6. };
  7. static struct err_t mqtt_errs[] = {
  8. {MQTTERR_NONE, "none"},
  9. // offline
  10. {MQTTERR_OFFLINE_PWRUP, "pwrup"},
  11. };
  12. int mqtt_sm_init()
  13. {
  14. struct statemachine_t *sm = &MDL.mqtt.sm;
  15. sm_reset_timing(sm, 100, 25);
  16. sm->states = mqtt_states;
  17. sm->state_nbr = sizeof(mqtt_states) / sizeof(struct state_t);
  18. sm->errs = mqtt_errs;
  19. sm->err_nbr = sizeof(mqtt_errs) / sizeof(struct err_t);
  20. sm_set_state(sm, SMST_OFFLINE, MQTTERR_OFFLINE_PWRUP);
  21. return 0;
  22. }
  23. static void mqtt_sm_offline()
  24. {
  25. struct mqtt_t *dev = &MDL.mqtt;
  26. struct statemachine_t *sm = &MDL.mqtt.sm;
  27. // log_dbg("mqtt sm offline, state:%s, step:%d", sm_get_szstate(sm), sm_get_step(sm));
  28. if (sm_get_step(sm) == 0)
  29. { // entry
  30. mqtt_reset_cmd();
  31. sm_set_step(sm, 10);
  32. }
  33. else if (sm_get_step(sm) == 10)
  34. { // wait cmd
  35. if (mqtt_get_cmd() == CMD_SM_READY)
  36. { // ready cmd
  37. log_dbg("%s, state:%s, step:%d, get ready cmd, try to connect", __func__, sm_get_szstate(sm), sm_get_step(sm));
  38. mqtt_reset_cmd();
  39. if (mqtt_connect() < 0)
  40. {
  41. if (dev->cli != NULL)
  42. {
  43. MQTTClient_destroy(&dev->cli);
  44. }
  45. log_dbg("%s, state:%s, step:%d, connect fail, re-connet 10 seconds later", __func__, sm_get_szstate(sm), sm_get_step(sm));
  46. sm_set_count(sm, 0);
  47. }
  48. else
  49. {
  50. dev->connlost = 0;
  51. log_dbg("%s, state:%s, step:%d, connect ok, goto ready", __func__, sm_get_szstate(sm), sm_get_step(sm));
  52. sm_set_state(sm, SMST_READY, MQTTERR_NONE);
  53. }
  54. }
  55. else
  56. {
  57. sm_inc_count(sm);
  58. // log_dbg("%s, state:%s, step:%d, wait ready cmd, count:%d", __func__, sm_get_szstate(sm), sm_get_step(sm), sm_get_count(sm));
  59. if (sm_get_count(sm) >= 10)
  60. { /* 10 seconds */
  61. sm_set_count(sm, 0);
  62. if (dev->enable)
  63. {
  64. log_dbg("%s, state:%s, step:%d, 10 seconds passed, try to connect", __func__, sm_get_szstate(sm), sm_get_step(sm));
  65. if (mqtt_connect() < 0)
  66. {
  67. if (dev->cli != NULL)
  68. {
  69. MQTTClient_destroy(&dev->cli);
  70. }
  71. log_dbg("%s, state:%s, step:%d, connect fail, re-connet 10 seconds later", __func__, sm_get_szstate(sm), sm_get_step(sm));
  72. sm_set_count(sm, 0);
  73. }
  74. else
  75. {
  76. dev->connlost = 0;
  77. log_dbg("%s, state:%s, step:%d, connect ok, goto stdby", __func__, sm_get_szstate(sm), sm_get_step(sm));
  78. sm_set_state(sm, SMST_READY, MQTTERR_NONE);
  79. }
  80. }
  81. else
  82. {
  83. ; /* nothing to do */
  84. }
  85. }
  86. }
  87. }
  88. }
  89. static void mqtt_sm_ready()
  90. {
  91. struct mqtt_t *dev = &MDL.mqtt;
  92. struct statemachine_t *sm = &MDL.mqtt.sm;
  93. int rc = 0;
  94. mqtt_ringbuffer_element_t e;
  95. char *topicName = NULL;
  96. int topicLen;
  97. MQTTClient_message *message = NULL;
  98. char *pdst = NULL;
  99. char *psrc = NULL;
  100. int i;
  101. if (sm_get_step(sm) == 0)
  102. { // entry
  103. mqtt_reset_cmd();
  104. sm_set_step(sm, 10);
  105. }
  106. else if (sm_get_step(sm) == 10)
  107. { // wait and chk
  108. if (mqtt_get_cmd() == CMD_SM_OFFLINE)
  109. { // offline cmd
  110. log_dbg("%s, state:%s, step:%d, get offline cmd, goto offline",
  111. __func__, sm_get_szstate(sm), sm_get_step(sm));
  112. mqtt_reset_cmd();
  113. if (dev->cli != NULL)
  114. {
  115. MQTTClient_disconnect(dev->cli, 0);
  116. MQTTClient_destroy(&dev->cli);
  117. }
  118. sm_set_state(sm, SMST_OFFLINE, MQTTERR_NONE);
  119. }
  120. else if (dev->connlost)
  121. {
  122. log_dbg("%s, state:%s, step:%d, connection lost detected, goto offline", __func__, sm_get_szstate(sm), sm_get_step(sm));
  123. if (dev->cli != NULL)
  124. {
  125. MQTTClient_destroy(&dev->cli);
  126. }
  127. sm_set_state(sm, SMST_OFFLINE, MQTTERR_NONE);
  128. }
  129. else
  130. {
  131. // rc = MQTTClient_receive(dev->cli, &topicName, &topicLen, &message, 30);
  132. // if (message){
  133. // log_dbg("%s, Message arrived, topic:%s topic len:%d payload len:%d", __func__, topicName,topicLen, message->payloadlen);
  134. // pdst = e.szpayload;
  135. // psrc = message->payload;
  136. // for( i = 0; i < message->payloadlen; i++ ){
  137. // *pdst++ = *psrc++;
  138. // }
  139. // *pdst = 0;
  140. // log_dbg("%s, payload:%s", __func__, e.szpayload);
  141. // MQTTClient_freeMessage(&message);
  142. // MQTTClient_free(topicName);
  143. // mqtt_lock_rxbuf();
  144. // mqtt_queue_rxbuf(e);
  145. // mqtt_unlock_rxbuf();
  146. // }
  147. // if (rc != 0){
  148. // dev->connlost = 1;
  149. // log_dbg("%s, MQTTClient_receive fail:%d, set connlost=1", __func__, rc);
  150. // }
  151. mqtt_lock_txbuf();
  152. rc = mqtt_peek_txbuf(&e, 0);
  153. mqtt_unlock_txbuf();
  154. if (rc == 1)
  155. {
  156. if (mqtt_pub(e.sztopic, e.szpayload) == 0)
  157. {
  158. mqtt_lock_txbuf();
  159. mqtt_dequeue_txbuf(&e);
  160. mqtt_unlock_txbuf();
  161. }
  162. else
  163. {
  164. log_dbg("failed to pub: %s : %s", e.sztopic, e.szpayload);
  165. }
  166. }
  167. }
  168. }
  169. }
  170. void mqtt_sm(void)
  171. {
  172. struct mqtt_t *dev = &MDL.mqtt;
  173. struct statemachine_t *sm = &dev->sm;
  174. sm_cal_timing(sm);
  175. dev->txbuf_usage = (double)mqtt_get_txbuf_used() / (double)mqtt_get_txbuf_size() * 100;
  176. dev->rxbuf_usage = (double)mqtt_get_rxbuf_used() / (double)mqtt_get_rxbuf_size() * 100;
  177. // log_dbg("%s, mqtt state : %d", __func__, mqtt_get_state());
  178. switch (mqtt_get_state())
  179. {
  180. case SMST_OFFLINE:
  181. mqtt_sm_offline();
  182. break;
  183. case SMST_READY:
  184. mqtt_sm_ready();
  185. break;
  186. default:
  187. log_dbg("%s, never reach here", __func__);
  188. break;
  189. }
  190. }