NiFi for Syslog

Let’s build with NiFi a flow similar to what we build with Logstash to store syslog messages into an ElasticSearch index.


Receving the messages

We start with the ListenSyslog processor of NiFi that can be configured to listen on any UDP or TCP ports for syslog. When listening on TCP, you must specify the maximum number of concurrent TCP connections. This parameters will be dependant of the number of systems sending syslog message simultaneously to your listener.
Because we have few systems, we will forward from each of them directly to the syslog listener of NiFi, using UDP for ease.
The ListenSyslog processor will parse the messages it receives for syslog components (priority, facility, severity, sending host, timestamp, …). If you have syslog messages received by another way (like getting them from a queue like Kafka or by directly tailing the syslog log files), you can use the processor ParseSyslog to find out the syslog message components.


Enrichment of the message using UpdateAttribute

Unfortunately, the parsing provided by the NiFi processor ListenSyslog or ParseSyslog will not interpret the numeric code (facility and severity) of the syslog message. Thus if you want to enrich your data with a textual representation of the syslog severity and facility, you have 2 options:
  1. Add a new processor in the flow, the UpdateAttributes and defined a lots of rules:
    • Rule name: “Severity 1”
    • Conditions: “${syslog.severity:equals(1)}”
    • Actions:
      • attribute: “syslog.severity_text”,
      • value: “Alert”
And you create a new rule for each condition to test, because we don’t see a way to create a single “if … then … else” or “case” statement with the NiFi Expression language. So this is very fastidious.
You have to implement one rule for each severity and facility if you want to have a textual representation of them.
  1. At the source, to use a syslog software able to give you the possibility to format the output of the message the way you want. Like the Rsyslog software allows you to define message template that can be formatted like a JSON string.
If you use this form of output in your syslog daemon, you must:
  • Define this template in all hosts, thus distribute this specific configuration to all of your systems
  • Or, alternatively, place an Rsyslog server between your systems and your NiFi. All systems send standard syslog message to the central Rsyslog server that will, in turn, forward them to the Syslog listener of NiFi after formatting them into a valid JSON.
  • In both case, you must configure the ListenSyslog processor of NiFi to not perform the parsing of the syslog message, because it is no more following the standard formatting.
  1. Later, at the visualization or aggregation step, you can incorporate a way to convert the numeric code into a textual representation of the severity and the facility. This will be dependant of the place where you store the messages and the mechanisms used to visualize them.


Routing messages inside a flow using RouteOnAttribute

Now that we’ve started a flow, let’s use it to illustrate the routing inside NiFi.
Maybe, you are interested to be directly notified than some critical error has been generated on one of your system. Also, you may not be interested to record all messages related to the Debug level in your data store.
Because we have one attribute for the Severity and another one containing the Facility, we can use the RouteOnAttribute processor to branch our flow, based on value for these attributes.
Inside RouteOnAttribute processor, we will add a property for each rule (match) we want to have. The name of this attribute will be used as output name to create a connection to the next processor.
We create one properties doing a single check of some value of the attribute:
In the above example, if a flow file has an attribute “syslog_severity” matching one of the properties, the flow file will be routed to a relationship having the name of this property.
We can add more of course, and create a property checking for each possible severity level. But for the purpose of the demonstration, we didn’t cover each possibilities. We could have added “emerg”, “notice” and “information”
We select ‘Route to Property name’ as routing strategy, thus this will allow us to create connections by selecting the relationship named ‘critical’, ‘debug’, ‘error’ or ‘warning’ in this case. And because a connection can be created for multiple relationships, we are able to create a flow:
  • “AllButDebug” by creating a connection where all relationships except “debug” are selected
  • “Debug”, a connection where we select only the “debug” relationship
  • “Error”, a connection with “critical” and “error” relationship selected
  • Etc.

In the above example, we route on the property name. If the rules behind different properties match, then the flow file will be routed to each relationship matched, so possibly sending the flow file to different processors.
The other possibilities for the “Routing strategy”, is to route the flow files to a single relationship called “Matched” (and the ones not matching to one called “Unmatched”):
  • If any of the checks match
  • If all checks match


Configuration of the resulting flow(s) for Syslog


Access to the flow XML template: NiFi_for_Syslog.xml