mqtt_cache.c 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. #include "plt.h"
  2. #include "mqtt_cache.h"
  3. #define BUF_PAYLOAD_MAX_LEN (8000)
  4. #define BUF_TOPIC_MAX_LEN (128)
  5. static int callback_read_one_item(void *para, int ncolumn, char **columnvalue, char *columnname[]);
  6. static int callback_count(void *para, int ncolumn, char **columnvalue, char *columnname[]);
  7. static int callback_pageSize(void *para, int ncolumn, char **columnvalue, char *columnname[]);
  8. static int callback_pageCount(void *para, int ncolumn, char **columnvalue, char *columnname[]);
  9. struct mqtt_cache_sqlite_data
  10. {
  11. int *rowid;
  12. void *data;
  13. };
  14. int mqtt_cache_open(const char *dir, const char *table_name, void **handle)
  15. {
  16. printf("%s,dir:%s,table:%s\n", __func__, dir, table_name);
  17. int nResult = sqlite3_open(dir, (sqlite3 **)handle);
  18. char sql_creat_table[128];
  19. if (nResult != SQLITE_OK)
  20. {
  21. // printf("open success\n");
  22. log_info("%s,dir:%s,table:%s\n", __func__, dir, table_name);
  23. return -1;
  24. }
  25. // open table
  26. memset(sql_creat_table, 0, 128);
  27. sprintf(sql_creat_table, "CREATE TABLE [%s] ([topic] TEXT(%d),[payload] TEXT(%d))", table_name, BUF_TOPIC_MAX_LEN, BUF_PAYLOAD_MAX_LEN);
  28. if (sqlite3_exec((sqlite3 *)(*handle), sql_creat_table, NULL, NULL, NULL) != SQLITE_OK)
  29. {
  30. // exit
  31. return 0;
  32. }
  33. else
  34. {
  35. // create table success
  36. return 0;
  37. }
  38. return 0;
  39. }
  40. int mqtt_cache_read_one_payload(void *handle, const char *table_name, mqtt_ringbuffer_element_t *e, int *idx)
  41. {
  42. char sql_select[128];
  43. struct mqtt_cache_sqlite_data stTemp;
  44. memset(sql_select, 0, 128);
  45. sprintf(sql_select, "select rowid,* from %s limit 1", table_name);
  46. stTemp.rowid = idx;
  47. stTemp.data = (void *)e;
  48. if (sqlite3_exec((sqlite3 *)handle, sql_select, callback_read_one_item, (void *)&stTemp, NULL) != SQLITE_OK)
  49. {
  50. // printf("select data from db error.\n");
  51. log_info("%s,read from table(%s) failed\n", __func__, table_name);
  52. return -1;
  53. }
  54. return 0;
  55. }
  56. int mqtt_cache_write_one_payload(void *handle, const char *table_name, mqtt_ringbuffer_element_t e)
  57. {
  58. char sql_insert[BUF_PAYLOAD_MAX_LEN + BUF_TOPIC_MAX_LEN + 128];
  59. memset(sql_insert, 0, BUF_PAYLOAD_MAX_LEN + BUF_TOPIC_MAX_LEN + 128);
  60. sprintf(sql_insert, "insert into %s values(\"%s\",\"%s\")", table_name, e.sztopic, e.szpayload);
  61. if (sqlite3_exec((sqlite3 *)handle, sql_insert, NULL, NULL, NULL) != SQLITE_OK)
  62. {
  63. // printf("insert data from db error.\n");
  64. log_info("%s,write to table(%s) failed\n", __func__, table_name);
  65. return -1;
  66. }
  67. return 0;
  68. }
  69. int mqtt_cache_delete_one_payload(void *handle, const char *table_name, int rowid)
  70. {
  71. char sql_delete[128];
  72. memset(sql_delete, 0, 128);
  73. sprintf(sql_delete, "delete from %s where rowid = %d", table_name, rowid);
  74. if (sqlite3_exec((sqlite3 *)handle, sql_delete, NULL, NULL, NULL) != SQLITE_OK)
  75. {
  76. // printf("delete data from db error.\n");
  77. log_info("%s,delete from table(%s) rowid(%d) failed\n", __func__, table_name, rowid);
  78. return -1;
  79. }
  80. return 0;
  81. }
  82. int mqtt_cache_get_payload_nb(void *handle, const char *table_name, int *nb)
  83. {
  84. char sql_select[128];
  85. memset(sql_select, 0, 128);
  86. sprintf(sql_select, "select count(1) from %s", table_name);
  87. if (sqlite3_exec((sqlite3 *)handle, sql_select, callback_count, (void *)nb, NULL) != SQLITE_OK)
  88. {
  89. // printf("get payload nb error.\n");
  90. log_info("%s,get itmNb from table(%s) failed\n", __func__, table_name);
  91. return -1;
  92. }
  93. return 0;
  94. }
  95. int mqtt_cache_free_memory(void *handle)
  96. {
  97. char sql_select[128];
  98. memset(sql_select, 0, 128);
  99. sprintf(sql_select, "vacuum");
  100. if (sqlite3_exec((sqlite3 *)handle, sql_select, NULL, NULL, NULL) != SQLITE_OK)
  101. {
  102. log_info("free memory exec failed\n");
  103. return -1;
  104. }
  105. return 0;
  106. }
  107. int mqtt_cache_get_memory_size(void *handle, long *size)
  108. {
  109. char sql_page_size[128];
  110. char sql_page_count[128];
  111. int page_size = 0;
  112. int page_count = 0;
  113. memset(sql_page_size, 0, 128);
  114. sprintf(sql_page_size, "PRAGMA PAGE_SIZE");
  115. memset(sql_page_count, 0, 128);
  116. sprintf(sql_page_count, "PRAGMA PAGE_COUNT");
  117. if (sqlite3_exec((sqlite3 *)handle, sql_page_size, callback_pageSize, (void *)&page_size, NULL) != SQLITE_OK)
  118. {
  119. // printf("get payload nb error.\n");
  120. return -1;
  121. }
  122. else if (sqlite3_exec((sqlite3 *)handle, sql_page_count, callback_pageCount, (void *)&page_count, NULL) != SQLITE_OK)
  123. {
  124. return -2;
  125. }
  126. *size = page_size * page_count;
  127. return 0;
  128. }
  129. void mqtt_cache_close(void *handle)
  130. {
  131. sqlite3_close((sqlite3 *)handle);
  132. }
  133. static int callback_read_one_item(void *para, int ncolumn, char **columnvalue, char *columnname[])
  134. {
  135. struct mqtt_cache_sqlite_data *stTemp = (struct mqtt_cache_sqlite_data *)para;
  136. mqtt_ringbuffer_element_t *e = (mqtt_ringbuffer_element_t *)(stTemp->data);
  137. int i = 0;
  138. for (i = 0; i < ncolumn; i++)
  139. {
  140. if (strcmp("rowid", (const char *)columnname[i]) == 0)
  141. {
  142. *(stTemp->rowid) = atoi(columnvalue[i]);
  143. }
  144. else if (strcmp("topic", (const char *)columnname[i]) == 0)
  145. {
  146. strcpy(e->sztopic, columnvalue[i]);
  147. }
  148. else if (strcmp("payload", (const char *)columnname[i]) == 0)
  149. {
  150. strcpy(e->szpayload, columnvalue[i]);
  151. }
  152. }
  153. return 0;
  154. }
  155. static int callback_count(void *para, int ncolumn, char **columnvalue, char *columnname[])
  156. {
  157. if (ncolumn > 0)
  158. {
  159. *((int *)para) = atoi(columnvalue[0]);
  160. return 0;
  161. }
  162. else
  163. {
  164. return -1;
  165. }
  166. }
  167. static int callback_pageSize(void *para, int ncolumn, char **columnvalue, char *columnname[])
  168. {
  169. if (ncolumn > 0)
  170. {
  171. *((int *)para) = atoi(columnvalue[0]);
  172. return 0;
  173. }
  174. else
  175. {
  176. return -1;
  177. }
  178. }
  179. static int callback_pageCount(void *para, int ncolumn, char **columnvalue, char *columnname[])
  180. {
  181. if (ncolumn > 0)
  182. {
  183. *((int *)para) = atoi(columnvalue[0]);
  184. return 0;
  185. }
  186. else
  187. {
  188. return -1;
  189. }
  190. }