diff options
author | V.Krishn <vkrishn4@gmail.com> | 2019-12-03 20:09:36 +0530 |
---|---|---|
committer | V.Krishn <vkrishn4@gmail.com> | 2019-12-03 20:09:36 +0530 |
commit | c2a187671226dbbf0b62037c868a1e64d76036a0 (patch) | |
tree | c3c3c1bc89bf3ed96a38fd518cc481ecc221602b | |
parent | d84fb197032220545590171d0506880aa2659cef (diff) | |
download | mqtt-dirpub-c2a187671226dbbf0b62037c868a1e64d76036a0.tar.bz2 |
move more functions to sub_client_output.c
clean sub_client.c
clean code
update to mosquitto-v1.5.9
-rw-r--r-- | client_shared.c | 6 | ||||
-rw-r--r-- | client_shared.h | 3 | ||||
-rw-r--r-- | sub_client.c | 144 | ||||
-rw-r--r-- | sub_client_output.c | 100 |
4 files changed, 109 insertions, 144 deletions
diff --git a/client_shared.c b/client_shared.c index 17b6dad..7c9bd52 100644 --- a/client_shared.c +++ b/client_shared.c @@ -1,5 +1,5 @@ /* -Copyright (c) 2014-2018 Roger Light <roger@atchoo.org> +Copyright (c) 2014-2019 Roger Light <roger@atchoo.org> Copyright (c) 2015-2019 V.Krishn <vkrishn@insteps.net> All rights reserved. This program and the accompanying materials @@ -134,7 +134,6 @@ void init_config(struct mosq_config *cfg) cfg->isfmask = false; cfg->overwrite = false; - } void client_config_cleanup(struct mosq_config *cfg) @@ -187,11 +186,10 @@ void client_config_cleanup(struct mosq_config *cfg) free(cfg->socks5_username); free(cfg->socks5_password); #endif - + //free(cfg->ffmask); //free(cfg->ftoken); free(cfg->fmask_topic); - } int client_config_load(struct mosq_config *cfg, int pub_or_sub, int argc, char *argv[]) diff --git a/client_shared.h b/client_shared.h index cfc0bb5..9e73b0c 100644 --- a/client_shared.h +++ b/client_shared.h @@ -1,5 +1,5 @@ /* -Copyright (c) 2014-2018 Roger Light <roger@atchoo.org> +Copyright (c) 2014-2019 Roger Light <roger@atchoo.org> Copyright (c) 2015-2019 V.Krishn <vkrishn@insteps.net> All rights reserved. This program and the accompanying materials @@ -104,7 +104,6 @@ struct mosq_config { char *fmask_topic; char *nodesuffix; char nsuffix[16]; /* limit 16 bytes. */ - }; int client_config_load(struct mosq_config *config, int pub_or_sub, int argc, char *argv[]); diff --git a/sub_client.c b/sub_client.c index a00171e..ca78fd2 100644 --- a/sub_client.c +++ b/sub_client.c @@ -1,5 +1,5 @@ /* -Copyright (c) 2009-2018 Roger Light <roger@atchoo.org> +Copyright (c) 2009-2019 Roger Light <roger@atchoo.org> Copyright (c) 2013-2019 V.Krishn <vkrishn@insteps.net> All rights reserved. This program and the accompanying materials @@ -24,18 +24,14 @@ Contributors: #include <stdlib.h> #include <string.h> #include <time.h> -#include <libgen.h> /* dirname, basename */ #ifndef WIN32 #include <unistd.h> #include <signal.h> #else -#include <sysstat.h> #include <process.h> #include <winsock2.h> #define snprintf sprintf_s #endif -#include <sys/stat.h> -#include <sys/types.h> #include <mosquitto.h> #include "client_shared.h" @@ -55,129 +51,8 @@ void my_signal_handler(int signum) #endif void print_message(struct mosq_config *cfg, const struct mosquitto_message *message); -int mkpath(const char *path, mode_t mode); -void _fmask(char *fmask, void *obj, const struct mosquitto_message *message); +void print_message_file(struct mosq_config *cfg, const struct mosquitto_message *message); -/* -File open with given mode. -returns file descriptor (fd) -*/ -/* ------------------------------------------------------------- */ -FILE *_mosquitto_fopen(const char *path, const char *mode) -{ -#ifdef WIN32 - char buf[MAX_PATH]; - int rc; - rc = ExpandEnvironmentStrings(path, buf, MAX_PATH); - if(rc == 0 || rc == MAX_PATH) { - return NULL; - }else { - return fopen(buf, mode); - } -#else - return fopen(path, mode); -#endif -} - -void my_message_file_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) -{ - struct mosq_config *cfg; - int i; - bool res; - - if(process_messages == false) return; - - assert(obj); - cfg = (struct mosq_config *)obj; - - if(cfg->retained_only && !message->retain && process_messages){ - process_messages = false; - mosquitto_disconnect(mosq); - return; - } - - if(message->retain && cfg->no_retain) return; - if(cfg->filter_outs){ - for(i=0; i<cfg->filter_out_count; i++){ - mosquitto_topic_matches_sub(cfg->filter_outs[i], message->topic, &res); - if(res) return; - } - } - cfg->fmask_topic = message->topic; - - FILE *fptr = NULL; - - if(cfg->format == NULL && strlen(cfg->fmask) >= 1) { - _fmask(cfg->fmask, cfg, message); - } - if(cfg->format && strlen(cfg->fmask) == 0) { /* experimental */ - _fmask(cfg->format, cfg, message); - } - - char *path, *prog; - path = dirname(strdup(cfg->ffmask)); - prog = basename(strdup(cfg->ffmask)); - - mkpath(path, S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); - - /* reasonable method to distinguish between directory - * and a writable node (by default is off) */ - if(cfg->nodesuffix) { - char *sf = cfg->nsuffix; /* limit 16 bytes. */ - sf = stpcpy(sf, cfg->nodesuffix); - if(cfg->nsuffix) { - char *to = cfg->ffmask; - to = stpcpy(to, path); - to = stpcpy(to, "/"); - if(prog) { - to = stpcpy(to, prog); - } - to = stpcpy(to, "."); - to = stpcpy(to, cfg->nsuffix); - } - } - - if(cfg->overwrite) { - fptr = _mosquitto_fopen(cfg->ffmask, "w"); - } else { - fptr = _mosquitto_fopen(cfg->ffmask, "a"); - } - - if(!fptr){ - fprintf(stderr, "Error: cannot open outfile, using stdout - %s\n", cfg->ffmask); - // need to do normal stdout - //mosquitto_message_callback_set(mosq, "my_message_callback"); - } else{ - if(cfg->verbose){ - if(message->payloadlen){ - fprintf(fptr, "%s ", message->topic); - fwrite(message->payload, 1, message->payloadlen, fptr); - if(cfg->eol){ - fprintf(fptr, "\n"); - } - }else{ - if(cfg->eol){ - fprintf(fptr, "%s (null)\n", message->topic); - } - } - }else{ - if(message->payloadlen){ - fwrite(message->payload, 1, message->payloadlen, fptr); - if(cfg->eol){ - fprintf(fptr, "\n"); - } - } - } - fclose(fptr); - } - if(cfg->msg_count>0){ - msg_count++; - if(cfg->msg_count == msg_count){ - process_messages = false; - mosquitto_disconnect(mosq); - } - } -} void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) { @@ -204,7 +79,11 @@ void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquit } } - print_message(cfg, message); + if(cfg->fmask){ + print_message_file(cfg, message); + }else{ + print_message(cfg, message); + } if(cfg->msg_count>0){ msg_count++; @@ -331,7 +210,7 @@ void print_usage(void) printf(" allowed masks are:\n"); printf(" @[epoch|date|year|month|day|datetime|hour|min|sec|id|topic[1-9]] \n"); printf(" eg. --fmask='@id@-@date@-@topic' for file id-20101221-topicname\n"); - printf(" NOTE: option -F (new in v1.5.0) does have any effect when used with --fmask\n"); + printf(" NOTE: enabled (experimental) use of option -F <value> with empty --fmask "" \n"); printf(" --nodesuffix : suffix for leaf/text node, when --fmask is provided\n"); printf(" --overwrite : overwrite the existing output file, can be used with --fmask only.\n"); printf(" --will-payload : payload for the client Will, which is sent by the broker in case of\n"); @@ -422,12 +301,7 @@ int main(int argc, char *argv[]) mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); } mosquitto_connect_with_flags_callback_set(mosq, my_connect_callback); - - if(cfg.isfmask) { - mosquitto_message_callback_set(mosq, my_message_file_callback); - } else { - mosquitto_message_callback_set(mosq, my_message_callback); - } + mosquitto_message_callback_set(mosq, my_message_callback); rc = client_connect(mosq, &cfg); if(rc) return rc; diff --git a/sub_client_output.c b/sub_client_output.c index 878aafb..e52c184 100644 --- a/sub_client_output.c +++ b/sub_client_output.c @@ -1,5 +1,5 @@ /* -Copyright (c) 2009-2018 Roger Light <roger@atchoo.org> +Copyright (c) 2009-2019 Roger Light <roger@atchoo.org> Copyright (c) 2015-2019 V.Krishn <vkrishn@insteps.net> All rights reserved. This program and the accompanying materials @@ -24,6 +24,7 @@ Contributors: #include <stdlib.h> #include <string.h> #include <time.h> +#include <libgen.h> /* dirname, basename */ #ifndef WIN32 #ifdef HAVE_UNISTD_H #include <unistd.h> @@ -373,7 +374,7 @@ static int do_mkdir(const char *path, mode_t mode) ** each directory in path exists, rather than optimistically creating ** the last element and working backwards. */ -int mkpath(const char *path, mode_t mode) +static int mkpath(const char *path, mode_t mode) { char *pp; char *sp; @@ -548,7 +549,7 @@ static void _setfmask(char *token, void *obj) /* Expand --fmask string options for output filename. */ /* ------------------------------------------------------------- */ -void _fmask(char *fmask, void *obj, const struct mosquitto_message *message) +static void _fmask(char *fmask, void *obj, const struct mosquitto_message *message) { struct mosq_config *cfg; @@ -596,3 +597,96 @@ void _fmask(char *fmask, void *obj, const struct mosquitto_message *message) } +/* +File open with given mode. +returns file descriptor (fd) +*/ +/* ------------------------------------------------------------- */ +static FILE *_mosquitto_fopen(const char *path, const char *mode) +{ +#ifdef WIN32 + char buf[MAX_PATH]; + int rc; + rc = ExpandEnvironmentStrings(path, buf, MAX_PATH); + if(rc == 0 || rc == MAX_PATH) { + return NULL; + }else { + return fopen(buf, mode); + } +#else + return fopen(path, mode); +#endif +} + +void print_message_file(struct mosq_config *cfg, const struct mosquitto_message *message) +{ + + cfg->fmask_topic = message->topic; + + FILE *fptr = NULL; + + if(cfg->format == NULL && strlen(cfg->fmask) >= 1) { + _fmask(cfg->fmask, cfg, message); + } + if(cfg->format && strlen(cfg->fmask) == 0) { /* experimental */ + _fmask(cfg->format, cfg, message); + } + + char *path, *prog; + path = dirname(strdup(cfg->ffmask)); + prog = basename(strdup(cfg->ffmask)); + + mkpath(path, S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH); + + /* reasonable method to distinguish between directory + * and a writable node (by default is off) */ + if(cfg->nodesuffix) { + char *sf = cfg->nsuffix; /* limit 16 bytes. */ + sf = stpcpy(sf, cfg->nodesuffix); + if(cfg->nsuffix) { + char *to = cfg->ffmask; + to = stpcpy(to, path); + to = stpcpy(to, "/"); + if(prog) { + to = stpcpy(to, prog); + } + to = stpcpy(to, "."); + to = stpcpy(to, cfg->nsuffix); + } + } + + if(cfg->overwrite) { + fptr = _mosquitto_fopen(cfg->ffmask, "w"); + } else { + fptr = _mosquitto_fopen(cfg->ffmask, "a"); + } + + if(!fptr){ + fprintf(stderr, "Error: cannot open outfile, using stdout - %s\n", cfg->ffmask); + // need to do normal stdout + //mosquitto_message_callback_set(mosq, "my_message_callback"); + } else{ + if(cfg->verbose){ + if(message->payloadlen){ + fprintf(fptr, "%s ", message->topic); + fwrite(message->payload, 1, message->payloadlen, fptr); + if(cfg->eol){ + fprintf(fptr, "\n"); + } + }else{ + if(cfg->eol){ + fprintf(fptr, "%s (null)\n", message->topic); + } + } + }else{ + if(message->payloadlen){ + fwrite(message->payload, 1, message->payloadlen, fptr); + if(cfg->eol){ + fprintf(fptr, "\n"); + } + } + } + fclose(fptr); + } + +} |