NiFi for Apache - the flow

Presentation

In the previous guide, you have installed, configured and enabled the MiNiFi agent on each of your web server. Now, it is time to build a flow on your central NiFi server to do something with the information that will be sent to it.

 

Building up a flow on the NiFi server

We are now back to the workspace of our NiFi server.
If you have followed this guide line by line, you should only have one input port called “RemoteMiNiFi” on it.
Let’s add some more processors to upload our Apache access log events to an ElasticSearch cluster (running version 5.x of the software).
 

Add a SplitText processor

Because it is very possible that one flow file sent to NiFi contain more than one line, but the next processor will only work if there is only one line of information inside the content of the flow file.
Split is done per line per line, so counting 1 line only than splitting.
We don’t have any header in the incoming flow file content and we want to remove the trailing new line character.
1.Apache_splitText.PNG
 

Add an ExtractGrok processor

This processor is new from version 1.2 of NiFi and will let you to use the power of the GROK extraction language to fill in multiple attributes at once.
With one single GROK expression, defined in the GROK pattern file, we will populate multiple attributes of our flow file with all the components of the Apach2 Combined log event.
You need to upload yourself the GROK pattern file somewhere on your NiFi server, we choose to create a /opt/nifi-custom folders on our NiFi servers to store all this customizations that are not included into NiFi.
We build the GROK pattern file from the ElasticSearch/Logstash GitHub site: https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns
We take httpd and grok-patterns files, merged them together into one file (/opt/nifi-custom/grok-patterns). <link to this file>
 
Note that each field extracted from the parsed text will be stored in a flow file attribute called “grok.<name of field>”.
2.Apache_GrokExtractor.PNG
 

Add a GeoEnrichIP processor

GeoEnrichIP will be used to geo-localize the IP address of the browsing client and add information like country name, ISO code and geographical coordinates.
 
You need to upload yourself the GeoIP database. This processor support the GeoLite version 2 database of Maxmind, which is free of use. To install the DB, do this: 3.Apache_GeoEnrichIP.PNG
The Geo enrichment will be put in attributes appended to “grok.clientip”, like “grok.clientip.geo.country”.
               

Add an ExtractText processor

The role of this processor will be to copy the content of the flow file into one attribute called “Message”. We are doing this before converting the content of the flow file into JSON, so we can easily add a JSON field with the original content of the Apache2 access log message.
We create a user-defined properties that we call “Message”. The value of this property is a simple regex selecting everything in one capture group: (.*).
Thus this processor will add an attribute called “Message” containing the full content to each flow files.
4.Apache_ExtractMessage.PNG
 

Add an UpdateAttributes processor

Some attributes needs to be created, some existing needs to be updated and we want also to get rid of the default GROK attribute naming “grok.<something>”
5.Apache_RenameGrokAttr-1.PNG
5.Apache_RenameGrokAttr-2.PNG
 
The configuration of this processor will enforce:
  • Put the name of the country and the IP address in a new attribute, because by default their value is associated to what will become the name of a nested JSON document. A valid JSON format doesn’t support to assign a value to the name of a nested document.
Without applying this change, if we add grok.clientip and grok.clientip.geo.* attributes to form a JSON document, we are created a nested document called “grok.clientip” inside “grok”, so we will have the resulting JSON like:
grok {
   clientip {
      value: text,
      geo {
             (...)
      }
   }
}


As per the JSON recommendation, you cannot assign a value to grok.clientip other than the nested document or you will get an error in ElasticSearch.
  • Some attributes extracted by the GrokExtractor and some fields of the nested document created by the GeoIP lookup are going to be moved into one simple, not dotted, named attribute. Eg.: the new attributes “CountryIso” will contain the value of the Grok.clientip.geo.country.isocode field.
Later, when we created a JSON document, we will have something flat and easily understandable.
  • Extract the file name of the log file we are tailing to create an attribute to distinguish between the various Apache virtual hosts we may have on the same server.
  • Create a “host” attribute containing the value of “s2s.host” attribute, which is the server name where MiNiFi is running.
  • Create a new timestamp in a format ElasticSearch is recognizing by default, without tweaking the default schema and mapping. The full expression used is: ${grok.timestamp:toDate("dd/MMM/yyyy:HH:mm:ss Z"):format("yyyy-MM-dd'T'HH:mm:ssZZ")}
First with “toDate” we indicate the current format of the timestamp, then by applying “format” to the expression, we convert it to a different timestamp format.
  • Adding a attribute “type”, with a default value of “Human”. The next step will be to analyse the agent attribute, trying to find if the visitor is human or some kind of robot.
  • Adding a attribute “website” derived from the log file name, so if we monitor more than one log file on the web server, we know to which Apache Virtual Site the access information belong.
 

Add another UpdateAttribute processor

We will use this processor to interpret the content of the “agent” attribute.
The agent often contains the information about the web browser used to visit a web site. Most often, non-human visitor (crawlers, search engines, …), use specific names as agent value.
We will build a set of Advanced flow file policy with rules to set the appropriate value into the “type” attribute.
6.Apache_VisitorType.PNG
Each rule we create have one condition: detecting the presence or not of a specific string into the “agent” attribute. If found, we set the “type” attribute to a certain value.
If no rules match, the default value of “human”, set a the previous step, is kept then.
 

Add an AttributesToJSON processor

Then all the below listed attributes must be merged together into the content of the flow file in the form of a JSON document using this processor
               
Attributes to consider in the resulting JSON
     website,  
     host,
     type,
     logfile,
     agent,
     auth,
     bytes,
     clientip,
     country,
     countryiso,
     region_name,
     region_code,
     postal_code,
     city_name,
     latitude,
     location,
     longitude,
     version,
     ident,
     referrer,
     request,
     response,
     timestamp,
     verb,
     message

 
7.Apache_AttributesToJSON.PNG
 

Add the ElasticSearch5 processor

Finally add the processor to write to the ElasticSearch cluster.
You give:
  • the name of your cluster
  • a list of the nodes to use for this ingestion (using port 9300 because this processor use the native ElasticSearch protocol)
  • The name of an attribute containing the unique identifier of the document we store in the index (uuid attribute is present by default in all flow files)
  • The name of the index to use (if not existing, it will be created)
  • The type of document. It is a name that you can use inside ElasticSearch to refer to this family of documents and in case of mapping activities
  • The other mandatory properties are left to their default.
8.Apache_ElasticSearch5.PNG
 
The final flow as seen on the NiFi workspace:
apache-nifi-flow-complete.PNG
 
You can download the NiFi template of the above flow here.
 

Prepare a default mapping on ElasticSearch

If we want to force ElasticSearch to interpret the content of the Bytes field as numeric, and not as text (default behaviour), we need to add a default mapping for the indices created by this flow.
Refer to the ElasticSearch field mapping configuration to force Bytes to be “long” and not “text”.