tbmqtt_cache.c 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208
  1. #include <stdio.h>
  2. #include <string.h>
  3. #include <stdlib.h>
  4. #include "sqlite3.h"
  5. #include "tbmqtt_cache.h"
  6. #include "log.h"
  7. static int callback_read_one_item(void* para, int ncolumn, char** columnvalue, char* columnname[]);
  8. static int callback_count(void* para, int ncolumn, char** columnvalue, char* columnname[]);
  9. static int callback_pageSize(void* para, int ncolumn, char** columnvalue, char* columnname[]);
  10. static int callback_pageCount(void* para, int ncolumn, char** columnvalue, char* columnname[]);
  11. struct tbmqtt_cache_sqlite_data {
  12. int* rowid;
  13. void* data;
  14. };
  15. int tbmqtt_cache_open(char* dir,char* table_name, void** handle)
  16. {
  17. printf("%s,dir:%s,table:%s\n",__func__,dir,table_name);
  18. int nResult = sqlite3_open(dir, (sqlite3**) handle);
  19. char sql_creat_table[128];
  20. if (nResult != SQLITE_OK) {
  21. //printf("open success\n");
  22. log_info("%s,dir:%s,table:%s\n",__func__,dir,table_name);
  23. return -1;
  24. }
  25. //open table
  26. memset(sql_creat_table, 0, 128);
  27. sprintf(sql_creat_table, "CREATE TABLE [%s] ([topic] TEXT(%d),[payload] TEXT(%d))", table_name, BUF_TOPIC_MAX_LEN,BUF_PAYLOAD_MAX_LEN);
  28. if (sqlite3_exec((sqlite3 *)( * handle), sql_creat_table, NULL, NULL, NULL) != SQLITE_OK) {
  29. //exit
  30. return 0;
  31. }
  32. else {
  33. //create table success
  34. return 0;
  35. }
  36. return 0;
  37. }
  38. int tbmqtt_cache_read_one_payload(void* handle,char* table_name , tbmqtt_ringbuffer_element_t *e,int *idx)
  39. {
  40. char sql_select[128];
  41. struct tbmqtt_cache_sqlite_data stTemp;
  42. memset(sql_select, 0, 128);
  43. sprintf(sql_select,"select rowid,* from %s limit 1", table_name);
  44. stTemp.rowid = idx;
  45. stTemp.data = (void *)e;
  46. if (sqlite3_exec((sqlite3*)handle, sql_select, callback_read_one_item, (void *)&stTemp, NULL) != SQLITE_OK)
  47. {
  48. //printf("select data from db error.\n");
  49. log_info("%s,read from table(%s) failed\n",__func__,table_name);
  50. return -1;
  51. }
  52. return 0;
  53. }
  54. int tbmqtt_cache_write_one_payload(void* handle, char* table_name , tbmqtt_ringbuffer_element_t e)
  55. {
  56. char sql_insert[BUF_PAYLOAD_MAX_LEN + BUF_TOPIC_MAX_LEN + 128];
  57. memset(sql_insert, 0, BUF_PAYLOAD_MAX_LEN + BUF_TOPIC_MAX_LEN + 128);
  58. sprintf(sql_insert, "insert into %s values(\"%s\",\"%s\")", table_name ,e.sztopic,e.szpayload);
  59. if (sqlite3_exec((sqlite3*)handle, sql_insert, NULL, NULL, NULL) != SQLITE_OK)
  60. {
  61. //printf("insert data from db error.\n");
  62. log_info("%s,write to table(%s) failed\n",__func__,table_name);
  63. return -1;
  64. }
  65. return 0;
  66. }
  67. int tbmqtt_cache_delete_one_payload(void* handle, char* table_name ,int rowid)
  68. {
  69. char sql_delete[128];
  70. memset(sql_delete, 0, 128);
  71. sprintf(sql_delete,"delete from %s where rowid = %d", table_name,rowid);
  72. if (sqlite3_exec((sqlite3*)handle, sql_delete, NULL, NULL, NULL) != SQLITE_OK)
  73. {
  74. //printf("delete data from db error.\n");
  75. log_info("%s,delete from table(%s) rowid(%d) failed\n",__func__,table_name,rowid);
  76. return -1;
  77. }
  78. return 0;
  79. }
  80. int tbmqtt_cache_get_payload_nb(void* handle, char* table_name, int* nb)
  81. {
  82. char sql_select[128];
  83. memset(sql_select, 0, 128);
  84. sprintf(sql_select, "select count(1) from %s", table_name);
  85. if (sqlite3_exec((sqlite3*)handle, sql_select, callback_count, (void *)nb, NULL) != SQLITE_OK)
  86. {
  87. //printf("get payload nb error.\n");
  88. log_info("%s,get itmNb from table(%s) failed\n",__func__,table_name);
  89. return -1;
  90. }
  91. return 0;
  92. }
  93. int tbmqtt_cache_get_memory_size(void* handle, long* size)
  94. {
  95. char sql_page_size[128];
  96. char sql_page_count[128];
  97. int page_size = 0;
  98. int page_count = 0;
  99. memset(sql_page_size, 0, 128);
  100. sprintf(sql_page_size, "PRAGMA PAGE_SIZE");
  101. memset(sql_page_count, 0, 128);
  102. sprintf(sql_page_count, "PRAGMA PAGE_COUNT");
  103. if (sqlite3_exec((sqlite3*)handle, sql_page_size, callback_pageSize, (void*)&page_size, NULL) != SQLITE_OK)
  104. {
  105. //printf("get payload nb error.\n");
  106. return -1;
  107. }
  108. else if (sqlite3_exec((sqlite3*)handle, sql_page_count, callback_pageCount, (void*)&page_count, NULL) != SQLITE_OK) {
  109. return -2;
  110. }
  111. *size = page_size * page_count;
  112. return 0;
  113. }
  114. int tbmqtt_cache_free_memory(void* handle)
  115. {
  116. char sql_select[128];
  117. memset(sql_select, 0, 128);
  118. sprintf(sql_select, "vacuum");
  119. if (sqlite3_exec((sqlite3*)handle, sql_select, NULL, NULL, NULL) != SQLITE_OK)
  120. {
  121. log_info("free memory exec failed\n");
  122. return -1;
  123. }
  124. return 0;
  125. }
  126. void tbmqtt_cache_close(void* handle)
  127. {
  128. sqlite3_close((sqlite3*)handle);
  129. }
  130. static int callback_read_one_item(void* para, int ncolumn, char** columnvalue, char* columnname[])
  131. {
  132. struct tbmqtt_cache_sqlite_data *stTemp = (struct tbmqtt_cache_sqlite_data *)para;
  133. tbmqtt_ringbuffer_element_t* e = (tbmqtt_ringbuffer_element_t *)(stTemp->data);
  134. int i = 0;
  135. for (i = 0; i < ncolumn; i++) {
  136. if (strcmp("rowid", (const char*)columnname[i]) == 0) {
  137. *(stTemp->rowid) = atoi(columnvalue[i]);
  138. }
  139. else if (strcmp("topic", (const char*)columnname[i]) == 0) {
  140. strcpy(e->sztopic, columnvalue[i]);
  141. }
  142. else if (strcmp("payload", (const char*)columnname[i]) == 0) {
  143. strcpy(e->szpayload, columnvalue[i]);
  144. }
  145. }
  146. return 0;
  147. }
  148. static int callback_count(void* para, int ncolumn, char** columnvalue, char* columnname[])
  149. {
  150. if (ncolumn > 0) {
  151. *((int*)para) = atoi(columnvalue[0]);
  152. return 0;
  153. }
  154. else {
  155. return -1;
  156. }
  157. }
  158. static int callback_pageSize(void* para, int ncolumn, char** columnvalue, char* columnname[])
  159. {
  160. if (ncolumn > 0) {
  161. *((int*)para) = atoi(columnvalue[0]);
  162. return 0;
  163. }
  164. else {
  165. return -1;
  166. }
  167. }
  168. static int callback_pageCount(void* para, int ncolumn, char** columnvalue, char* columnname[])
  169. {
  170. if (ncolumn > 0) {
  171. *((int*)para) = atoi(columnvalue[0]);
  172. return 0;
  173. }
  174. else {
  175. return -1;
  176. }
  177. }