tbmqtt_cache.c 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. #include "plt.h"
  2. #include "tbmqtt_cache.h"
  3. static int callback_read_one_item(void* para, int ncolumn, char** columnvalue, char* columnname[]);
  4. static int callback_count(void* para, int ncolumn, char** columnvalue, char* columnname[]);
  5. static int callback_pageSize(void* para, int ncolumn, char** columnvalue, char* columnname[]);
  6. static int callback_pageCount(void* para, int ncolumn, char** columnvalue, char* columnname[]);
  7. struct tbmqtt_cache_sqlite_data {
  8. int* rowid;
  9. void* data;
  10. };
  11. int tbmqtt_cache_open(const char* dir,const char* table_name, void** handle)
  12. {
  13. printf("%s,dir:%s,table:%s\n",__func__,dir,table_name);
  14. int nResult = sqlite3_open(dir, (sqlite3**) handle);
  15. char sql_creat_table[128];
  16. if (nResult != SQLITE_OK) {
  17. //printf("open success\n");
  18. log_info("%s,dir:%s,table:%s\n",__func__,dir,table_name);
  19. return -1;
  20. }
  21. //open table
  22. memset(sql_creat_table, 0, 128);
  23. sprintf(sql_creat_table, "CREATE TABLE [%s] ([topic] TEXT(%d),[payload] TEXT(%d))", table_name, BUF_TOPIC_MAX_LEN,BUF_PAYLOAD_MAX_LEN);
  24. if (sqlite3_exec((sqlite3 *)( * handle), sql_creat_table, NULL, NULL, NULL) != SQLITE_OK) {
  25. //exit
  26. return 0;
  27. }
  28. else {
  29. //create table success
  30. return 0;
  31. }
  32. return 0;
  33. }
  34. int tbmqtt_cache_read_one_payload(void* handle,const char* table_name , tbmqtt_ringbuffer_element_t *e,int *idx)
  35. {
  36. char sql_select[128];
  37. struct tbmqtt_cache_sqlite_data stTemp;
  38. memset(sql_select, 0, 128);
  39. sprintf(sql_select,"select rowid,* from %s limit 1", table_name);
  40. stTemp.rowid = idx;
  41. stTemp.data = (void *)e;
  42. if (sqlite3_exec((sqlite3*)handle, sql_select, callback_read_one_item, (void *)&stTemp, NULL) != SQLITE_OK)
  43. {
  44. //printf("select data from db error.\n");
  45. log_info("%s,read from table(%s) failed\n",__func__,table_name);
  46. return -1;
  47. }
  48. return 0;
  49. }
  50. int tbmqtt_cache_write_one_payload(void* handle, const char* table_name , tbmqtt_ringbuffer_element_t e)
  51. {
  52. char sql_insert[BUF_PAYLOAD_MAX_LEN + BUF_TOPIC_MAX_LEN + 128];
  53. memset(sql_insert, 0, BUF_PAYLOAD_MAX_LEN + BUF_TOPIC_MAX_LEN + 128);
  54. sprintf(sql_insert, "insert into %s values(\"%s\",\"%s\")", table_name ,e.sztopic,e.szpayload);
  55. if (sqlite3_exec((sqlite3*)handle, sql_insert, NULL, NULL, NULL) != SQLITE_OK)
  56. {
  57. //printf("insert data from db error.\n");
  58. log_info("%s,write to table(%s) failed\n",__func__,table_name);
  59. return -1;
  60. }
  61. return 0;
  62. }
  63. int tbmqtt_cache_delete_one_payload(void* handle, const char* table_name ,int rowid)
  64. {
  65. char sql_delete[128];
  66. memset(sql_delete, 0, 128);
  67. sprintf(sql_delete,"delete from %s where rowid = %d", table_name,rowid);
  68. if (sqlite3_exec((sqlite3*)handle, sql_delete, NULL, NULL, NULL) != SQLITE_OK)
  69. {
  70. //printf("delete data from db error.\n");
  71. log_info("%s,delete from table(%s) rowid(%d) failed\n",__func__,table_name,rowid);
  72. return -1;
  73. }
  74. return 0;
  75. }
  76. int tbmqtt_cache_get_payload_nb(void* handle, const char* table_name, int* nb)
  77. {
  78. char sql_select[128];
  79. memset(sql_select, 0, 128);
  80. sprintf(sql_select, "select count(1) from %s", table_name);
  81. if (sqlite3_exec((sqlite3*)handle, sql_select, callback_count, (void *)nb, NULL) != SQLITE_OK)
  82. {
  83. //printf("get payload nb error.\n");
  84. log_info("%s,get itmNb from table(%s) failed\n",__func__,table_name);
  85. return -1;
  86. }
  87. return 0;
  88. }
  89. int tbmqtt_cache_get_memory_size(void* handle, long* size)
  90. {
  91. char sql_page_size[128];
  92. char sql_page_count[128];
  93. int page_size = 0;
  94. int page_count = 0;
  95. memset(sql_page_size, 0, 128);
  96. sprintf(sql_page_size, "PRAGMA PAGE_SIZE");
  97. memset(sql_page_count, 0, 128);
  98. sprintf(sql_page_count, "PRAGMA PAGE_COUNT");
  99. if (sqlite3_exec((sqlite3*)handle, sql_page_size, callback_pageSize, (void*)&page_size, NULL) != SQLITE_OK)
  100. {
  101. //printf("get payload nb error.\n");
  102. return -1;
  103. }
  104. else if (sqlite3_exec((sqlite3*)handle, sql_page_count, callback_pageCount, (void*)&page_count, NULL) != SQLITE_OK) {
  105. return -2;
  106. }
  107. *size = page_size * page_count;
  108. return 0;
  109. }
  110. int tbmqtt_cache_free_memory(void* handle)
  111. {
  112. char sql_select[128];
  113. memset(sql_select, 0, 128);
  114. sprintf(sql_select, "vacuum");
  115. if (sqlite3_exec((sqlite3*)handle, sql_select, NULL, NULL, NULL) != SQLITE_OK)
  116. {
  117. log_info("free memory exec failed\n");
  118. return -1;
  119. }
  120. return 0;
  121. }
  122. void tbmqtt_cache_close(void* handle)
  123. {
  124. sqlite3_close((sqlite3*)handle);
  125. }
  126. static int callback_read_one_item(void* para, int ncolumn, char** columnvalue, char* columnname[])
  127. {
  128. struct tbmqtt_cache_sqlite_data *stTemp = (struct tbmqtt_cache_sqlite_data *)para;
  129. tbmqtt_ringbuffer_element_t* e = (tbmqtt_ringbuffer_element_t *)(stTemp->data);
  130. int i = 0;
  131. for (i = 0; i < ncolumn; i++) {
  132. if (strcmp("rowid", (const char*)columnname[i]) == 0) {
  133. *(stTemp->rowid) = atoi(columnvalue[i]);
  134. }
  135. else if (strcmp("topic", (const char*)columnname[i]) == 0) {
  136. strcpy(e->sztopic, columnvalue[i]);
  137. }
  138. else if (strcmp("payload", (const char*)columnname[i]) == 0) {
  139. strcpy(e->szpayload, columnvalue[i]);
  140. }
  141. }
  142. return 0;
  143. }
  144. static int callback_count(void* para, int ncolumn, char** columnvalue, char* columnname[])
  145. {
  146. if (ncolumn > 0) {
  147. *((int*)para) = atoi(columnvalue[0]);
  148. return 0;
  149. }
  150. else {
  151. return -1;
  152. }
  153. }
  154. static int callback_pageSize(void* para, int ncolumn, char** columnvalue, char* columnname[])
  155. {
  156. if (ncolumn > 0) {
  157. *((int*)para) = atoi(columnvalue[0]);
  158. return 0;
  159. }
  160. else {
  161. return -1;
  162. }
  163. }
  164. static int callback_pageCount(void* para, int ncolumn, char** columnvalue, char* columnname[])
  165. {
  166. if (ncolumn > 0) {
  167. *((int*)para) = atoi(columnvalue[0]);
  168. return 0;
  169. }
  170. else {
  171. return -1;
  172. }
  173. }