#include "plt.h" struct tb_t tb; /* in seconds */ static long long tb_get_unixts() { return (long long)time(NULL); } /* in ms */ static long long tb_get_ts() { return (tb_get_unixts() + tbmqtt_get_tz() * (long long)3600) * (long long)1000; // return tb_get_unixts()*1000; } static int tb_send_rpc_response(char *request_topic, char *response_payload) { char *p = NULL; char szreqid[256]; char *pszreqid = szreqid; int reqid = 0; tbmqtt_ringbuffer_element_t e; p = strrchr(request_topic, '/'); if (p != NULL) { p++; while (*p != 0) { *pszreqid++ = *p++; } *pszreqid = 0; reqid = atoi(szreqid); e.cmd = CMD_MQTT_SENDKV; e.sztopic[0] = 0; e.szpayload[0] = 0; strcpy(e.szpayload, response_payload); sprintf(e.sztopic, "v1/devices/me/rpc/response/%s", szreqid); tbmqtt_lock_txbuf(); tbmqtt_queue_txbuf(e); tbmqtt_unlock_txbuf(); } return 0; } bool tb_check_invt(int *timer, const int invt) { if (invt <= 0) return false; if ((*timer)++ > invt) { *timer = 0; return true; } return false; } void tb_send_telemetry(const char *data) { tbmqtt_ringbuffer_element_t e; e.sztopic[0] = 0; e.szpayload[0] = 0; e.cmd = CMD_MQTT_SENDKV; strcpy(e.sztopic, "v1/devices/me/telemetry"); sprintf(e.szpayload, "{'ts':%lld,'values':{%s}}", tb_get_ts(), data); // log_dbg("%s, e.szpayload : %s", __func__, e.szpayload); // if (cJSON_Parse(e.szpayload) == NULL) // { // log_err("%s, invalid json string : %s", __func__, data); // return; // } // else // { // } #ifdef DEBUG_MQTT printf("%s: ready to send payload:%s\n", __func__, data); #endif tbmqtt_lock_txbuf(); tbmqtt_queue_txbuf(e); tbmqtt_unlock_txbuf(); } static int tb_update_attributes(char *payload) { tbmqtt_ringbuffer_element_t e; e.cmd = CMD_MQTT_SENDKV; e.sztopic[0] = 0; e.szpayload[0] = 0; sprintf(e.szpayload, "{%s}", payload); strcpy(e.sztopic, "v1/devices/me/attributes"); log_dbg("%s, topic:%s, payload:%s", __func__, e.sztopic, e.szpayload); tbmqtt_lock_txbuf(); tbmqtt_queue_txbuf(e); tbmqtt_unlock_txbuf(); return 0; } static void tb_upload_param() { char tb_buf[4096] = {0}; if (tb.param_en > 0) { tb_buf[0] = 0; tb.param_en = 0; // ctn 直接发送给对应的设备,不需要 idx tb_buf[0] = 0; sprintf(tb_buf, "'sys_sw_version':'%s', 'norm_cap':%d,'norm_pow':%d", plt_get_version_string(), ctn_get_norm_cap(), ctn_get_norm_pow()); tb_send_telemetry(tb_buf); } } static void tb_upload_sys() { char tb_buf[4096] = {0}; if (tb_check_invt(&tb.sys_timer[0], tb.sys_intv[0])) { tb_buf[0] = 0; // sys state and err sprintf(tb_buf, "'state':'%s','err':'%s','ap':%d,'soc':%.1f,'ems_state':'%s','host':'%s'", ctn_get_state_str(), ctn_get_err_str(), ctn_get_ap(), ctn_get_soc(), ems_get_state_str(), ctn_get_hostname()); tb_send_telemetry(tb_buf); } /* 300 seconds period */ if (tb_check_invt(&tb.sys_timer[1], tb.sys_intv[1])) { /* cpu mem disk */ tb_buf[0] = 0; mac_get_tbmqtt_data(tb_buf); tb_send_telemetry(tb_buf); } } static void tb_upload_pcs() { if (pcs_get_comm_st() != COMMST_NORMAL) { return; } char tb_buf[4096] = {0}; if (tb_check_invt(&tb.pcs_timer[0], tb.pcs_intv[0])) { tb_buf[0] = 0; pcs_get_tbmqtt_data_interval_30s(tb_buf); tb_send_telemetry(tb_buf); } if (tb_check_invt(&tb.pcs_timer[1], tb.pcs_intv[1])) { tb_buf[0] = 0; pcs_get_tbmqtt_data_interval_60s(tb_buf); tb_send_telemetry(tb_buf); } if (tb_check_invt(&tb.pcs_timer[2], tb.pcs_intv[2])) { tb_buf[0] = 0; pcs_get_tbmqtt_data_interval_half_hour(tb_buf); tb_send_telemetry(tb_buf); } } static void tb_upload_pack() { char tb_buf[8192] = {0}; if (pack_get_comm_st() != COMMST_NORMAL) { return; } if (tb_check_invt(&tb.pack_timer[0], tb.pack_intv[0])) { tb_buf[0] = 0; pack_get_tbmqtt_data_inteval_30s(tb_buf); tb_send_telemetry(tb_buf); } if (tb_check_invt(&tb.pack_timer[1], tb.pack_intv[1])) { tb_buf[0] = 0; pack_get_tbmqtt_data_inteval_60s(tb_buf); tb_send_telemetry(tb_buf); } #ifdef DEBUG_MQTT else { log_dbg("timer :%d, intv :%d", tb.pack_timer[1], tb.pack_intv[1]); } #endif int model = pack_get_model(); if (tb_check_invt(&tb.pack_timer[2], tb.pack_intv[2])) { /* cell voltage*/ tb_buf[0] = 0; pack_get_tbmqtt_data_inteval_level3(tb_buf); tb_send_telemetry(tb_buf); tb.cellt_sub_idx++; if ((model == DEVM_CATL280A && tb.cellt_sub_idx > BMU_NBR) || (model == DEVM_KL && tb.cellt_sub_idx > KL_CLUSTER_NBR)) { tb.cellt_sub_idx = 1; } } #ifdef DEBUG_MQTT else { log_dbg("timer :%d, intv :%d", tb.pack_timer[2], tb.pack_intv[2]); } #endif if (tb_check_invt(&tb.pack_timer[3], tb.pack_intv[3])) { /* cell temperature*/ tb_buf[0] = 0; pack_get_tbmqtt_data_inteval_300s(tb_buf); tb_send_telemetry(tb_buf); // tb.cellt_sub_idx++; // if ((model == DEVM_CATL280A && tb.cellt_sub_idx > BMU_NBR) || (model == DEVM_KL && tb.cellt_sub_idx > KL_CLUSTER_NBR)) // { // tb.cellt_sub_idx = 1; // } } } static void tb_upload_ac(void) { char tb_buf[4096] = {0}; if (tb_check_invt(&tb.ac_timer[0], tb.ac_intv[0])) { tb_buf[0] = 0; ac_get_tbmqtt_data(tb_buf); tb_send_telemetry(tb_buf); } } static int tb_upload_meter(void) { char tb_buf[4096] = {0}; if (tb_check_invt(&tb.meter_timer[0], tb.meter_intv[0])) { tb_buf[0] = 0; meter_get_tbmqtt_data(tb_buf); tb_send_telemetry(tb_buf); } } static void tb_upload_env(void) { char tb_buf[4096] = {0}; if (tb_check_invt(&tb.env_timer[0], tb.env_intv[0])) { tb_buf[0] = 0; env_get_tbmqtt_data(tb_buf); tb_send_telemetry(tb_buf); } } static void tb_upload_dehumi(void) { char tb_buf[4096] = {0}; if (tb_check_invt(&tb.dehumi_timer[0], tb.dehumi_intv[0])) { tb_buf[0] = 0; dehumi_get_tbmqtt_data(tb_buf); tb_send_telemetry(tb_buf); } } static void tb_upload_fe(void) { char tb_buf[4096] = {0}; static char tb_buf_cache[4096] = {0}; memset(tb_buf, 0, sizeof(tb_buf)); fe_get_tbmqtt_data(tb_buf); if (tb_check_invt(&tb.fe_timer[0], tb.fe_intv[0] || strcmp(tb_buf, tb_buf_cache) != 0)) { tb_send_telemetry(tb_buf); strcpy(tb_buf_cache, tb_buf); } } static void tb_upload_fa(void) { char tb_buf[4096] = {0}; static char tb_buf_cache[4096] = {0}; memset(tb_buf, 0, sizeof(tb_buf)); fa_get_tbmqtt_data(tb_buf); if (tb_check_invt(&tb.fa_timer[0], tb.fa_intv[0] || strcmp(tb_buf, tb_buf_cache) != 0)) { tb_send_telemetry(tb_buf); strcpy(tb_buf_cache, tb_buf); } } static void tb_upload_plc(void) { char tb_buf[4096] = {0}; if (tb_check_invt(&tb.plc_timer[0], tb.plc_intv[0])) { tb_buf[0] = 0; plc_get_tbmqtt_data(tb_buf); tb_send_telemetry(tb_buf); } } static void tb_upload_ups(void) { char tb_buf[4096] = {0}; if (ups_get_comm_state() != COMMST_NORMAL) { return; } if (tb_check_invt(&tb.ups_timer[0], tb.ups_intv[0])) { tb_buf[0] = 0; ups_get_tbmqtt_data(tb_buf); tb_send_telemetry(tb_buf); } } static void tb_upload_ctn_cal(void) { char tb_buf[4096] = {0}; if (ctn_cal_get_state() != SMST_RUN) { return; } if (tb_check_invt(&tb.ctn_cal_timer[0], tb.ctn_cal_intv[0])) { tb_buf[0] = 0; ctn_cal_get_tb_data(tb_buf); tb_send_telemetry(tb_buf); } } static void tb_upload_dido(void) { char tb_buf[4096] = {0}; static char tb_buf_cache[4096] = {0}; memset(tb_buf, 0, sizeof(tb_buf)); dido_get_tbmqtt_data(tb_buf); if (tb_check_invt(&tb.dido_timer[0], tb.dido_intv[0]) || strcmp(tb_buf, tb_buf_cache) != 0) { tb_send_telemetry(tb_buf); strcpy(tb_buf_cache, tb_buf); } } static void tb_upload_ctn(void) { char tb_buf[4096] = {0}; if (tb_check_invt(&tb.ctn_timer[0], tb.ctn_intv[0])) { tb_buf[0] = 0; ctn_get_tbmqtt_data(tb_buf); tb_send_telemetry(tb_buf); } } int tb_is_upload_enable() { struct tb_t *dev = &tb; return dev->upload_enable; } int tb_enable_upload() { struct tb_t *dev = &tb; dev->upload_enable = 1; return 0; } int tb_disable_upload() { struct tb_t *dev = &tb; dev->upload_enable = 0; return 0; } int tb_init(void) { struct tb_t *dev = &tb; int ret = 0; dev->upload_enable = 1; dev->param_en = 0; dev->sys_intv[0] = 60; dev->sys_intv[1] = 300; dev->pcs_intv[0] = 30; dev->pcs_intv[1] = 60; dev->pcs_intv[2] = 60 * 30; dev->cellv_sub_idx = 1; dev->cellt_sub_idx = 1; dev->pack_intv[0] = 30; dev->pack_intv[1] = 60; dev->pack_intv[2] = 180; dev->pack_intv[3] = 300; // tb_set_cellv_lowspeed(); dev->ac_intv[0] = 60; dev->meter_intv[0] = 60; dev->env_intv[0] = 60; dev->dehumi_intv[0] = 60; dev->fe_intv[0] = 60; dev->fa_intv[0] = 60; dev->plc_intv[0] = 60; dev->ups_intv[0] = 60; dev->ctn_cal_intv[0] = 60; dev->dido_intv[0] = 30; dev->dido_intv[0] = 30; dev->ctn_intv[0] = 60; dev->tb_init = 1; dev->tb_lock_timer = 0; dev->tb_lock = 1; dev->tb_lock_intv = 15; if (tbmqtt_init() != 0) { log_dbg("%s, tbmqtt init fail", __func__); } log_dbg("%s, ret:%d", __func__, ret); return ret; } int tb_set_cellv_highspeed() { struct tb_t *dev = &tb; dev->pack_intv[2] = 6; /* cell v */ /* 6 = high: 6*10=60s=1min for all 10 cabinet */ return 0; } int tb_set_cellv_lowspeed() { struct tb_t *dev = &tb; dev->pack_intv[2] = 60; /* cell v */ /* 60 = low: 60*10=600s=10min for all 10 cabinet */ } static int tmpaps_received = 0; static int tmpaps; static void tb_proc_recv() { struct tb_t *dev = &tb; if (dev->tb_init == 1) { dev->param_en = 1; dev->tb_init = 0; tb_update_attributes("'TbLock':true"); } if ((dev->tb_lock == 0) && (dev->tb_lock_timer++ >= dev->tb_lock_intv)) { dev->tb_lock_timer = 0; dev->tb_lock = 1; tb_update_attributes("'TbLock':true"); } cJSON *root = NULL; cJSON *item = NULL; int aps = 0; int rc; tbmqtt_ringbuffer_element_t e; tbmqtt_lock_rxbuf(); rc = tbmqtt_dequeue_rxbuf(&e); tbmqtt_unlock_rxbuf(); if (rc == 1) { log_dbg("%s, get topic:%s", __func__, e.sztopic); log_dbg("%s, and payload:%s", __func__, e.szpayload); root = cJSON_Parse(e.szpayload); if (!root) { log_dbg("%s, cJSON_Parse null, payload : %s", __func__, e.szpayload); } else { item = cJSON_GetObjectItem(root, "method"); if (!item) { log_dbg("%s, cJSON_GetObjectItem method fail", __func__); } else { log_dbg("%s, get method:%s", __func__, item->valuestring); if (strcmp(item->valuestring, "set_lock") == 0) { item = cJSON_GetObjectItem(root, "params"); if (!item) { log_dbg("%s, set_lock failed to get params ", __func__); } else { log_dbg("%s, set_lock, value :%s", __func__, item->valuestring); if (strcmp(item->valuestring, "lock") == 0) { dev->tb_lock = 1; } else { dev->tb_lock = 0; dev->tb_lock_timer = 0; } } } else if (strcmp(item->valuestring, "get_lock") == 0) { sprintf(e.szpayload, "%d", dev->tb_lock); tb_send_rpc_response(e.sztopic, e.szpayload); } else if (strcmp(item->valuestring, "set_debug") == 0) { item = cJSON_GetObjectItem(root, "params"); if (!item) { log_dbg("%s, set_debug failed to get params ", __func__); } else { log_dbg("%s, set_debug, value :%s", __func__, item->valuestring); if (strcmp(item->valuestring, "debug") == 0) { ems_set_debug_mode(1); } else { ems_set_debug_mode(0); } } } else if (strcmp(item->valuestring, "get_debug") == 0) { sprintf(e.szpayload, "%d", ems_get_debug_mode()); tb_send_rpc_response(e.sztopic, e.szpayload); } else if (strcmp(item->valuestring, "get_aps") == 0) { sprintf(e.szpayload, "%d", ess_get_aps()); tb_send_rpc_response(e.sztopic, e.szpayload); } else { // if (dev->tb_lock) if (0) { log_dbg("%s, thingsboard is locked, do nothing", __func__); } else { int clearTimer = 0; if (strcmp(item->valuestring, "stdby") == 0) { ess_send_sm_cmd(CMD_SM_STDBY); } else if (strcmp(item->valuestring, "stop") == 0) { ess_send_sm_cmd(CMD_SM_STOP); } else if (strcmp(item->valuestring, "ready") == 0) { ess_send_sm_cmd(CMD_SM_READY); } else if (strcmp(item->valuestring, "offgrid") == 0) { ess_send_sm_cmd(CMD_SM_OFFGRID); } else if (strcmp(item->valuestring, "active_aps") == 0) { if (tmpaps_received == 1) { ess_set_aps(tmpaps); tmpaps_received = 0; } } else if (strcmp(item->valuestring, "set_aps") == 0) { item = cJSON_GetObjectItem(root, "params"); if (!item) { log_dbg("%s, cJSON_GetObjectItem params fail", __func__); } else { // ess_set_aps(item->valueint); log_dbg("%s, aps:%d", __func__, item->valueint); tmpaps = item->valueint; tmpaps_received = 1; } } else if (strcmp(item->valuestring, "fe_start") == 0) // 消防喷发 { item = cJSON_GetObjectItem(root, "params"); if (item) { fe_set_start(item->valueint); } } else if (strcmp(item->valuestring, "bms_ctrl_pwrup") == 0) { plc_bms_ctrl_pwrup(); } else if (strcmp(item->valuestring, "pack_stop") == 0) { pack_send_sm_cmd(CMD_SM_STOP); } else if (strcmp(item->valuestring, "pack_ready") == 0) { pack_send_sm_cmd(CMD_SM_READY); } else if (strcmp(item->valuestring, "pcs_stop") == 0) { pcs_send_sm_cmd(CMD_SM_STOP); } else if (strcmp(item->valuestring, "pcs_ready") == 0) { pcs_send_sm_cmd(CMD_SM_READY); } else if (strcmp(item->valuestring, "bms_ctrl_pwrup_without_shielding_signal") == 0) { plc_bms_ctrl_pwrup_without_shielding_signal(); } else if (strcmp(item->valuestring, "pcs_dev_start") == 0) { gt_comm_set_dev_hv_on(1); while (gt_get_hv_status(1) != BMSHV_ON) { sleep(1); } n9_set_dev_stopcmd(1); n9_set_dev_resetcmd(1); n9_set_dev_startcmd(1); while (n9_get_runstat(1) != N9_RUNSTAT_IDLE) { sleep(1); } n9_set_dev_aps(1, 0); // pcs_set_dev_startcmd(); } else if (strcmp(item->valuestring, "pcs_dev_stop") == 0) { n9_set_dev_stopcmd(1); gt_comm_set_dev_hv_off(1); // pcs_set_dev_stopcmd(); } else if (strcmp(item->valuestring, "pcs_dev_reset") == 0) { n9_set_dev_resetcmd(1); // pcs_set_dev_resetcmd(); } else if (strcmp(item->valuestring, "pcs_dev_active_aps") == 0) { if (tmpaps_received == 1) { n9_set_dev_aps(1, tmpaps); // pcs_set_dev_aps(tmpaps); tmpaps_received = 0; } } else if (strcmp(item->valuestring, "dido_fire_confirm") == 0) { dido_fire_confirm(1, 1); } else if (strcmp(item->valuestring, "dido_fire_cancle") == 0) { fe_set_start(-1); sleep(1); dido_fire_confirm(1, 0); } else if (strcmp(item->valuestring, "dido_fan_start") == 0) { dido_fan_switch(1, 1); } else if (strcmp(item->valuestring, "dido_fan_stop") == 0) { dido_fan_switch(1, 0); } else if (strcmp(item->valuestring, "dido_alarm_on") == 0) { dido_sys_alarm(1, 1); } else if (strcmp(item->valuestring, "dido_alarm_off") == 0) { dido_sys_alarm(1, 0); } else { clearTimer = dev->tb_lock_timer; log_dbg("%s, unknown cmd : %s", __func__, item->valuestring); } dev->tb_lock_timer = clearTimer; } } } cJSON_Delete(root); } } } void tb_exe(void) { tb_proc_recv(); if (tb_is_upload_enable()) { tb_upload_param(); tb_upload_sys(); tb_upload_ctn(); tb_upload_pcs(); tb_upload_pack(); tb_upload_ac(); tb_upload_meter(); tb_upload_env(); tb_upload_dehumi(); tb_upload_fe(); tb_upload_dido(); tb_upload_fa(); } } int tb_set_param_en(int val) { struct tb_t *dev = &tb; dev->param_en = val; } char *tb_get_state_str(void) { return tbmqtt_get_state_str(); } int tb_get_stp(void) { return tbmqtt_get_stp(); } char *tb_get_err_str(void) { return tbmqtt_get_err_str(); } int tb_get_tick(void) { return tbmqtt_get_tick(); } double tb_get_timing_ave(void) { return tbmqtt_get_timing_ave(); } double tb_get_timing_cur(void) { return tbmqtt_get_timing_cur(); } double tb_get_timing_max(void) { return tbmqtt_get_timing_max(); } int tb_get_enable(void) { return tbmqtt_get_enable(); } char *tb_get_servip_str(void) { return tbmqtt_get_servip_str(); } int tb_get_servport(void) { return tbmqtt_get_servport(); } char *tb_get_client_id(void) { return tbmqtt_get_client_id(); } double tb_get_txbuf_usage(void) { return tbmqtt_get_txbuf_usage(); } char *tb_get_access_token(void) { return tbmqtt_get_access_token(); } int tb_get_tz(void) { return tbmqtt_get_tz(); } int tb_get_tool_data(char *buf) { struct tb_t *t = &tb; char buf_temp[1024]; sprintf(buf, "" REVERSE " TB " NONE " \n"); tbmqtt_get_tool_data(buf_temp); strcat(buf, buf_temp); sprintf(buf_temp, " upload_en:%d param_en:%d\n", t->upload_enable, t->param_en); strcat(buf, buf_temp); return 0; }