NiFi and JSON

NiFi and JSON

Remark: with the introduction of the records-oriented flow files, managing JSON with NiFi became easier than ever.
The below how-to about JSON manipulation is making an extensive use of message contents and attributes extraction / modification.
You will find later pages about the records usage in NiFi.


Starting point

Unlike logstash, NiFi doen’t transform automatically each event into JSON. The output processors of Logstash creates by default a valid representation JSON of your data (codec = json). So if your sources of event are not sending JSON format documents, the creation of the JSON representation should be part of the flow you are defining for these events.
The processor PutElasticsearchHttp and PutElasticSearch5 just push to ElasticSearch the content of the flow file without any transformation. So before submitting the flow file to any of these processors, you must be sure that its content is a valid JSON representation of your data.
In the case of syslog message, the ListenSyslog processor will not generate a JSON representation of the event but can parse it and put all elements of the event into flow files attributes (severity, timestamp, facility, …).
For any message that are not JSON formatted you receive, the best approach to build the JSON string is to:
  • Extract some part of the flow file content and store it into some flow file attributes
Recommended processors: ListenSyslog, SyslogParse for Syslog messages only, ExtractText, or ExtractGrok (since 1.2.0)
  • Add or modify existing attributes (UpdateAttributes)
  • Create one JSON string using the value of the attributes
To handle yourself the JSON string: ReplaceText
To generate a valid JSON from the existing attributes: AttributesToJSON

Parsing content into attributes

To extract information from the content and store it into attributes, you can use:
  • The ListenSyslog processor that starts a syslog server, directly listening from the syslog clients. It will extract significant pieces of the syslog message (message, host, sender, timestamp, facilily, severity and priority) and put them into flow files attributes.
  • The SyslogParse does the same as above but inside a flow, on the content of flow files sent to it.
  • ExtractText allows you to build a regular expression with capture group(s) that will be used to extract information from the content of the flow files and store the result in some attribute you specify.
  • ExtractGrok is new in the 1.2.0 version of NiFi and allows you to use the same GROK expression as in Logstash to extract meaningful information from the content of the flow files and store it into attributes named “grok.<identifier>”. The following GROK pattern:
RT @%{USERNAME:retweet_user}: %{GREEDYDATA:message}
If matched, 2 attributes will be created in the flow file:
  • grok.retweet_user
  • grok.message
You can configure this processor to store the results of the GROK not inside attributes but inside the content, replacing the old one by a valid JSON representation of all matched elements from your pattern.
Thus, with the above GROK example, the resulting content of your flow file will be:
{ “retweet_user”:”someuser”, “message”:”the whole text of his message” }
Unlike in Logstash, you have to point your ExtractGrok processor to a file containing your definitions of GROK patterns. You can use the GROK patterns from the Logstash stack to help you: https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
You can test your GROK search patterns at the following address: http://grokdebug.herokuapp.com/
We are not going to exhaustively list all processors able to parse content into attributes. We hope that the 4 above example show you that there are processors linked to a given type of message (eg: syslog) and some other which are generic, applying to any structured message.
ExtractText or ExtractGrok could be used against tweets, Apache log, system information, …

Creating JSON with ReplaceText

When your flow as generated flow files in which different attributes hold the information you want to keep, you just have to redirect the “success” relationship of the last processor to a ReplaceText processor which will be used to replace the text of the flow file by a JSON string:
  • Search value properties: (?s:^.*$) to match the whole content
  • Replacement value properties:
{
  “timestamp”:”${syslog.timestamp}”,
  ”facility”:${syslog.facility},
  ”severity”:${syslog.severity},
  ”priority”:${syslog.priority},
  “hostname”:“${syslog.hostname}”,
  “sender”:”${syslog.sender}”,
  “message”:”${syslog.message}”
}
The above replacement string will construct a valid JSON string with the attributes defined by the listenSyslog input processor.
Last point of attention, when you have to build the JSON representation of the message yourself, is to escape the characters that are not supported as-is or that will break the JSON formatting (like double-quotes and so on).
When you create a JSON string with one the NiFi processor, you can use the function “escapeJson”. So a more correct implementation of our replacement above should read:
{
  “timestamp”:”${syslog.timestamp}”,
  “facility”:${syslog.facility},
  “severity”:${syslog.severity},
  “priority”:${syslog.priority},
  “hostname”:”${syslog.hostname:escapeJson()}”,
  “sender”:”${syslog.sender:escapeJson()}”,
  “message”:”${syslog.message:escapeJson()}”
}

Using AttributesToJSON

This processor will replace the content of the flow file with a JSON representation of the data, or it will create a new attribute (named JSONAttributes) containing the JSON result. This is useful if you need to do further processing and need the original content for this, not a JSON representation of it.
In the properties of this processor,
  • You can define the list of attributes you want to include in the JSON string (by default, it takes all of them).
  • You may want to add the core attributes too (attributes present in each flow files and populated automatically.
  • You also define the type of value to use for empty or non-existing attributes (Null or an empty string)
  • You define if the resulting JSON is stored in a new attribute or stored as the content of the flow file.

Other possibilities

We just see before that the ExtractGrok processor can be used to replace the content of the flow file by a valid JSON representation of the data matched by its GROK clause.
This is a nice approach when moving from Logstash towards NiFi.
This is also an approach that allows you to type less information, so creating less errors.

Modify the value of one JSON attribute

If you have a flow file containing JSON, you can manipulate it by adding, removing or modify any JSON attributes. The best approach is to:
  • If you receive a message with already JSON formatted content, then you can use evaluateJSONpath
{ “field1”:”value 1”,
  “field2”: { “sub1”:”value2”,
              “sub2”:”value3” } }
You can create a flow file attribute for any JSON field using the following syntax:
  1. Property name: the name you want to have for your new attribute
  2. Property value: “$.field1” (to access “value 1”) or “$.field2.sub1” (to access nested fields values)
  • Of course, if your message is not yet in JSON format, you will have to use one of the processor that extract information from the content and store it as attribute in the flow file (see above some examples).
  • Then we pass the flow file to the UpdateAttribute processor. With this one, you can:
    1. Add attributes, using a fix value or an expression calculating something, like “$now():format(“yyyy/MM/dd HH:MM:ss.SSS”) to create a timestamp.
    2. Modify the content of existing attributes, eventually reusing the original content of the attribute or placing a totally different value.
    3. Delete one or more attribute
    4. Add or modify an attribute after performing some checks. This is done with the “Advanced usage” where we can define a broad set of rules, each rule containing one or more check, which, if matched (any or all), will trigger a specific attribute creation or modification.
See in the “NiFi for Syslog” configuration how we use UpdateAttribute to add a textual representation of the syslog message severity, which is only contained as a digit in the original message.
  • Finally, create a JSON string from each attribute using the processor AttributesToJSON. See the paragraph above giving more detailed usage of this processor.