<div dir="ltr"><div>hi all<br>i have a json messageĀ that contains a nested json list of dicts<br><br>{"a":1,"b":[{"c":1},{"c":2},{"c":3}]}<br><br>i want to flat that message, so expected result looks likeĀ {<br>"a": 1,<br>"b_0_c": 1,<br>"b_1_c": 2,<br>"b_2_c": 3<br>}<br><br>My approach is a python implemented parser.</div><div>Is it possible to achieve the same result using the built-in syslog-ng tools?</div><div>My solution below<br><br><pre style="white-space:pre-wrap;box-sizing:inherit;margin-top:4px;margin-bottom:4px;padding:8px;font-size:12px;line-height:1.50001;font-variant-ligatures:none;word-break:normal;border-radius:4px;color:rgb(29,28,29);font-family:Monaco,Menlo,Consolas,"Courier New",monospace">@define kafka-implementation kafka-c

python {

import collections
import json

class FlattenedJson(object):

    def parse(self, log_message, flat_message=None):
        def flatten(d, parent_key='', sep='_'):
            items = []
            for k, v in d.items():
                new_key = parent_key + sep + k if parent_key else k
                if isinstance(v, collections.MutableMapping):
                    items.extend(flatten(v, new_key, sep=sep).items())
                elif isinstance(v, list):
                    for idx, value in enumerate(v):
                        items.extend(flatten(value, new_key + sep + str(idx), sep).items())
                else:
                    items.append((new_key, v))
            return dict(items)
        try:
            decoded_msg = json.loads(log_message['MESSAGE'].decode('utf-8'))
            flat_message = flatten(decoded_msg)
            final_message = str(json.dumps(flat_message)).encode(encoding='utf-8')
            log_message['MESSAGE'] = final_message
        except Exception as error:
            log_message['python_error'] = 'An exception occurred: {}'.format(error)
        return True
};

destination d_kafka_dnstap {
  kafka(
    topic("mytopic")
    bootstrap-servers("localhost:9092")
    message("$(format-flat-json  --scope all-nv-pairs application_name=myapp @timestamp=${ISODATE} )")
  );
};

source s_net_dnstap { network( transport(udp) port(514) flags(no-parse) ); };

parser p_dnstap { channel {
    parser { python(class("FlattenedJson")); };
    parser { json-parser(prefix("dnstap.")); };
  };
};

log { source(s_net_dnstap); parser(p_dnstap); destination(d_kafka_dnstap); };</pre></div></div>