aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorV.Krishn <vkrishn4@gmail.com>2019-12-03 20:09:36 +0530
committerV.Krishn <vkrishn4@gmail.com>2019-12-03 20:09:36 +0530
commitc2a187671226dbbf0b62037c868a1e64d76036a0 (patch)
treec3c3c1bc89bf3ed96a38fd518cc481ecc221602b
parentd84fb197032220545590171d0506880aa2659cef (diff)
downloadmqtt-dirpub-c2a187671226dbbf0b62037c868a1e64d76036a0.tar.bz2
move more functions to sub_client_output.c
clean sub_client.c clean code update to mosquitto-v1.5.9
-rw-r--r--client_shared.c6
-rw-r--r--client_shared.h3
-rw-r--r--sub_client.c144
-rw-r--r--sub_client_output.c100
4 files changed, 109 insertions, 144 deletions
diff --git a/client_shared.c b/client_shared.c
index 17b6dad..7c9bd52 100644
--- a/client_shared.c
+++ b/client_shared.c
@@ -1,5 +1,5 @@
/*
-Copyright (c) 2014-2018 Roger Light <roger@atchoo.org>
+Copyright (c) 2014-2019 Roger Light <roger@atchoo.org>
Copyright (c) 2015-2019 V.Krishn <vkrishn@insteps.net>
All rights reserved. This program and the accompanying materials
@@ -134,7 +134,6 @@ void init_config(struct mosq_config *cfg)
cfg->isfmask = false;
cfg->overwrite = false;
-
}
void client_config_cleanup(struct mosq_config *cfg)
@@ -187,11 +186,10 @@ void client_config_cleanup(struct mosq_config *cfg)
free(cfg->socks5_username);
free(cfg->socks5_password);
#endif
-
+
//free(cfg->ffmask);
//free(cfg->ftoken);
free(cfg->fmask_topic);
-
}
int client_config_load(struct mosq_config *cfg, int pub_or_sub, int argc, char *argv[])
diff --git a/client_shared.h b/client_shared.h
index cfc0bb5..9e73b0c 100644
--- a/client_shared.h
+++ b/client_shared.h
@@ -1,5 +1,5 @@
/*
-Copyright (c) 2014-2018 Roger Light <roger@atchoo.org>
+Copyright (c) 2014-2019 Roger Light <roger@atchoo.org>
Copyright (c) 2015-2019 V.Krishn <vkrishn@insteps.net>
All rights reserved. This program and the accompanying materials
@@ -104,7 +104,6 @@ struct mosq_config {
char *fmask_topic;
char *nodesuffix;
char nsuffix[16]; /* limit 16 bytes. */
-
};
int client_config_load(struct mosq_config *config, int pub_or_sub, int argc, char *argv[]);
diff --git a/sub_client.c b/sub_client.c
index a00171e..ca78fd2 100644
--- a/sub_client.c
+++ b/sub_client.c
@@ -1,5 +1,5 @@
/*
-Copyright (c) 2009-2018 Roger Light <roger@atchoo.org>
+Copyright (c) 2009-2019 Roger Light <roger@atchoo.org>
Copyright (c) 2013-2019 V.Krishn <vkrishn@insteps.net>
All rights reserved. This program and the accompanying materials
@@ -24,18 +24,14 @@ Contributors:
#include <stdlib.h>
#include <string.h>
#include <time.h>
-#include <libgen.h> /* dirname, basename */
#ifndef WIN32
#include <unistd.h>
#include <signal.h>
#else
-#include <sysstat.h>
#include <process.h>
#include <winsock2.h>
#define snprintf sprintf_s
#endif
-#include <sys/stat.h>
-#include <sys/types.h>
#include <mosquitto.h>
#include "client_shared.h"
@@ -55,129 +51,8 @@ void my_signal_handler(int signum)
#endif
void print_message(struct mosq_config *cfg, const struct mosquitto_message *message);
-int mkpath(const char *path, mode_t mode);
-void _fmask(char *fmask, void *obj, const struct mosquitto_message *message);
+void print_message_file(struct mosq_config *cfg, const struct mosquitto_message *message);
-/*
-File open with given mode.
-returns file descriptor (fd)
-*/
-/* ------------------------------------------------------------- */
-FILE *_mosquitto_fopen(const char *path, const char *mode)
-{
-#ifdef WIN32
- char buf[MAX_PATH];
- int rc;
- rc = ExpandEnvironmentStrings(path, buf, MAX_PATH);
- if(rc == 0 || rc == MAX_PATH) {
- return NULL;
- }else {
- return fopen(buf, mode);
- }
-#else
- return fopen(path, mode);
-#endif
-}
-
-void my_message_file_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
-{
- struct mosq_config *cfg;
- int i;
- bool res;
-
- if(process_messages == false) return;
-
- assert(obj);
- cfg = (struct mosq_config *)obj;
-
- if(cfg->retained_only && !message->retain && process_messages){
- process_messages = false;
- mosquitto_disconnect(mosq);
- return;
- }
-
- if(message->retain && cfg->no_retain) return;
- if(cfg->filter_outs){
- for(i=0; i<cfg->filter_out_count; i++){
- mosquitto_topic_matches_sub(cfg->filter_outs[i], message->topic, &res);
- if(res) return;
- }
- }
- cfg->fmask_topic = message->topic;
-
- FILE *fptr = NULL;
-
- if(cfg->format == NULL && strlen(cfg->fmask) >= 1) {
- _fmask(cfg->fmask, cfg, message);
- }
- if(cfg->format && strlen(cfg->fmask) == 0) { /* experimental */
- _fmask(cfg->format, cfg, message);
- }
-
- char *path, *prog;
- path = dirname(strdup(cfg->ffmask));
- prog = basename(strdup(cfg->ffmask));
-
- mkpath(path, S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
-
- /* reasonable method to distinguish between directory
- * and a writable node (by default is off) */
- if(cfg->nodesuffix) {
- char *sf = cfg->nsuffix; /* limit 16 bytes. */
- sf = stpcpy(sf, cfg->nodesuffix);
- if(cfg->nsuffix) {
- char *to = cfg->ffmask;
- to = stpcpy(to, path);
- to = stpcpy(to, "/");
- if(prog) {
- to = stpcpy(to, prog);
- }
- to = stpcpy(to, ".");
- to = stpcpy(to, cfg->nsuffix);
- }
- }
-
- if(cfg->overwrite) {
- fptr = _mosquitto_fopen(cfg->ffmask, "w");
- } else {
- fptr = _mosquitto_fopen(cfg->ffmask, "a");
- }
-
- if(!fptr){
- fprintf(stderr, "Error: cannot open outfile, using stdout - %s\n", cfg->ffmask);
- // need to do normal stdout
- //mosquitto_message_callback_set(mosq, "my_message_callback");
- } else{
- if(cfg->verbose){
- if(message->payloadlen){
- fprintf(fptr, "%s ", message->topic);
- fwrite(message->payload, 1, message->payloadlen, fptr);
- if(cfg->eol){
- fprintf(fptr, "\n");
- }
- }else{
- if(cfg->eol){
- fprintf(fptr, "%s (null)\n", message->topic);
- }
- }
- }else{
- if(message->payloadlen){
- fwrite(message->payload, 1, message->payloadlen, fptr);
- if(cfg->eol){
- fprintf(fptr, "\n");
- }
- }
- }
- fclose(fptr);
- }
- if(cfg->msg_count>0){
- msg_count++;
- if(cfg->msg_count == msg_count){
- process_messages = false;
- mosquitto_disconnect(mosq);
- }
- }
-}
void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message)
{
@@ -204,7 +79,11 @@ void my_message_callback(struct mosquitto *mosq, void *obj, const struct mosquit
}
}
- print_message(cfg, message);
+ if(cfg->fmask){
+ print_message_file(cfg, message);
+ }else{
+ print_message(cfg, message);
+ }
if(cfg->msg_count>0){
msg_count++;
@@ -331,7 +210,7 @@ void print_usage(void)
printf(" allowed masks are:\n");
printf(" @[epoch|date|year|month|day|datetime|hour|min|sec|id|topic[1-9]] \n");
printf(" eg. --fmask='@id@-@date@-@topic' for file id-20101221-topicname\n");
- printf(" NOTE: option -F (new in v1.5.0) does have any effect when used with --fmask\n");
+ printf(" NOTE: enabled (experimental) use of option -F <value> with empty --fmask "" \n");
printf(" --nodesuffix : suffix for leaf/text node, when --fmask is provided\n");
printf(" --overwrite : overwrite the existing output file, can be used with --fmask only.\n");
printf(" --will-payload : payload for the client Will, which is sent by the broker in case of\n");
@@ -422,12 +301,7 @@ int main(int argc, char *argv[])
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
}
mosquitto_connect_with_flags_callback_set(mosq, my_connect_callback);
-
- if(cfg.isfmask) {
- mosquitto_message_callback_set(mosq, my_message_file_callback);
- } else {
- mosquitto_message_callback_set(mosq, my_message_callback);
- }
+ mosquitto_message_callback_set(mosq, my_message_callback);
rc = client_connect(mosq, &cfg);
if(rc) return rc;
diff --git a/sub_client_output.c b/sub_client_output.c
index 878aafb..e52c184 100644
--- a/sub_client_output.c
+++ b/sub_client_output.c
@@ -1,5 +1,5 @@
/*
-Copyright (c) 2009-2018 Roger Light <roger@atchoo.org>
+Copyright (c) 2009-2019 Roger Light <roger@atchoo.org>
Copyright (c) 2015-2019 V.Krishn <vkrishn@insteps.net>
All rights reserved. This program and the accompanying materials
@@ -24,6 +24,7 @@ Contributors:
#include <stdlib.h>
#include <string.h>
#include <time.h>
+#include <libgen.h> /* dirname, basename */
#ifndef WIN32
#ifdef HAVE_UNISTD_H
#include <unistd.h>
@@ -373,7 +374,7 @@ static int do_mkdir(const char *path, mode_t mode)
** each directory in path exists, rather than optimistically creating
** the last element and working backwards.
*/
-int mkpath(const char *path, mode_t mode)
+static int mkpath(const char *path, mode_t mode)
{
char *pp;
char *sp;
@@ -548,7 +549,7 @@ static void _setfmask(char *token, void *obj)
/* Expand --fmask string options for output filename. */
/* ------------------------------------------------------------- */
-void _fmask(char *fmask, void *obj, const struct mosquitto_message *message)
+static void _fmask(char *fmask, void *obj, const struct mosquitto_message *message)
{
struct mosq_config *cfg;
@@ -596,3 +597,96 @@ void _fmask(char *fmask, void *obj, const struct mosquitto_message *message)
}
+/*
+File open with given mode.
+returns file descriptor (fd)
+*/
+/* ------------------------------------------------------------- */
+static FILE *_mosquitto_fopen(const char *path, const char *mode)
+{
+#ifdef WIN32
+ char buf[MAX_PATH];
+ int rc;
+ rc = ExpandEnvironmentStrings(path, buf, MAX_PATH);
+ if(rc == 0 || rc == MAX_PATH) {
+ return NULL;
+ }else {
+ return fopen(buf, mode);
+ }
+#else
+ return fopen(path, mode);
+#endif
+}
+
+void print_message_file(struct mosq_config *cfg, const struct mosquitto_message *message)
+{
+
+ cfg->fmask_topic = message->topic;
+
+ FILE *fptr = NULL;
+
+ if(cfg->format == NULL && strlen(cfg->fmask) >= 1) {
+ _fmask(cfg->fmask, cfg, message);
+ }
+ if(cfg->format && strlen(cfg->fmask) == 0) { /* experimental */
+ _fmask(cfg->format, cfg, message);
+ }
+
+ char *path, *prog;
+ path = dirname(strdup(cfg->ffmask));
+ prog = basename(strdup(cfg->ffmask));
+
+ mkpath(path, S_IRWXU | S_IRWXG | S_IROTH | S_IXOTH);
+
+ /* reasonable method to distinguish between directory
+ * and a writable node (by default is off) */
+ if(cfg->nodesuffix) {
+ char *sf = cfg->nsuffix; /* limit 16 bytes. */
+ sf = stpcpy(sf, cfg->nodesuffix);
+ if(cfg->nsuffix) {
+ char *to = cfg->ffmask;
+ to = stpcpy(to, path);
+ to = stpcpy(to, "/");
+ if(prog) {
+ to = stpcpy(to, prog);
+ }
+ to = stpcpy(to, ".");
+ to = stpcpy(to, cfg->nsuffix);
+ }
+ }
+
+ if(cfg->overwrite) {
+ fptr = _mosquitto_fopen(cfg->ffmask, "w");
+ } else {
+ fptr = _mosquitto_fopen(cfg->ffmask, "a");
+ }
+
+ if(!fptr){
+ fprintf(stderr, "Error: cannot open outfile, using stdout - %s\n", cfg->ffmask);
+ // need to do normal stdout
+ //mosquitto_message_callback_set(mosq, "my_message_callback");
+ } else{
+ if(cfg->verbose){
+ if(message->payloadlen){
+ fprintf(fptr, "%s ", message->topic);
+ fwrite(message->payload, 1, message->payloadlen, fptr);
+ if(cfg->eol){
+ fprintf(fptr, "\n");
+ }
+ }else{
+ if(cfg->eol){
+ fprintf(fptr, "%s (null)\n", message->topic);
+ }
+ }
+ }else{
+ if(message->payloadlen){
+ fwrite(message->payload, 1, message->payloadlen, fptr);
+ if(cfg->eol){
+ fprintf(fptr, "\n");
+ }
+ }
+ }
+ fclose(fptr);
+ }
+
+}