NiFi and ElasticSearch

Custom mapping for the index you will update with NiFi flows

Unlike the Logstash output “ElasticSearch”, you cannot associate a customized mapping with the processor. Therefore, if the dynamic mapping of ElasticSearch doesn’t attribute the type you really want to one of your fields, you will have to use a default mapping template (see this chapter in the ElasticSearch section of the site).
If doing that, remember that:
  • To give a template name to your mapping matching any name of the index you will create. So if you create index name with the date of the day in it, using the following expression: “twitter-${now().format(‘YYYY-MM-DD’)}”, “the template name” must be equal to “twitter-*” or “twitter*” at least.
  • The default mapping must be loaded before you create your first index with your first document, because once established by ElasticSearch, you cannot change anymore the mapping of any existing field.
  • The mapping will do the type casting, so if in the JSON representation of the document you have “bytes”: “100”, thus a string representation, forcing “bytes” to be “long” via the default mapping will succeed to cast it into “long” in the ElasticSearch index. Of course, if for some reason, the value contains a letter, like “100 KB”, then you will not be able to insert the document in the index and an error will be returned by ElasticSearch.

Definition of a correct timestamp field for your time series

A second difference between the use of Logstash and NiFi to insert document into an ElasticSearch index is the management of the “timestamp” field.
When you store time series messages, thus for documents when the date and time of the generation of the message is important to be retained as such, you need to have at least one field in each document holding this time and date information.
In Logstash, the ElasticSearch output carry on automatically this function, creating a “timestamp” field based on the time the document is added to the index. This is acceptable when the messages are from a live stream. With NiFi, there is no timestamp field created by default, so be sure that you have one correctly set before submitting the document to ElasticSearch.
Yes, but how?
The generator of the message put a field with the data and time somewhere in the message. Like for the Twitter API, a field called “created_at” is present inside the JSON of the tweet message. In case of Syslog messages, you also have field generated by the syslog system with the date and time when the message is generated.
In most cases, ElasticSearch automatic mapping will discover that some fields hold a data and time representation. In this case you are lucky and you have nothing to do more.
But in some other case, the format used by the field is not correctly interpreted as a date, so once again, the ElasticSearch default mapping is our friend.
Let’s take the case of Twitter and the “created_at” field. Here is an exemple of such field value:
                "created_at":"Wed May 03 14:20:03 +0000 2017"
To have it interpreted as a valid date by ElasticSearch, we will use the following in our default mapping, loaded into ElasticSearch before any other operations:
  "mappings" : {
    "default" : {
      "properties" : {
        "created_at" : {
          "type" : "date",
          "format" : "EEE MMM dd HH:mm:ss Z YYYY"
        }
      }
    }
  }
Inside the section “Mappings” -> “default” -> “properties” -> <field name>, we gave the “type” of “date” and, very important, the format of the timestamp. The format follows the Java definition for SimpleDateFormat (https://docs.oracle.com/javase/8/docs/api/java/text/SimpleDateFormat.html).
In the above example:
  • “EEE”, the day of the week coded with the 3 first letters
  • “MMM”, the month, 3 x M meaning the 3 first letters of the name, 2xM the number with leading zero, 1xM the number without leading zero, …
  • “dd”, the number of the day with leading zero
  • “HH:mm:ss”, the classical representation of time where 2 PM is 14
  • “Z”, the time zone identification, like +0200 or -0800
  • “YYYY”, the current year, coded with 4 digits
If your message doesn’t contain a timestamp, or you want to add the timestamp of the processing, you will have to use an UpdateAttributes processor that will add or modify a flow file attribute using an expression like ${now()}.
Your timestamp maybe generated with the event, present somewhere in a complex string, so you will have to extract it with other meaningful information in the rest of your flow (see the example about Apache2 logs – impl/bigdata/nifi/apache)

PutElasticsearchHttp vs PutElasticsearch5 vs PutElasticsearch

PutElasticsearchHttp is the processor implementing the REST API over HTTP to talk to an ElasticSearch cluster. By default, this is done toward the TCP port 9200 of any node where it is active (each node by default)
PutElasticsearch and PutElasticsearch5 are implementation of the ElasticSearch transport protocol, the native protocol of ElasticSearch, which is version dependent. PutElasticsearch is for ElasticSearch version 2.x and PutElasticsearch5 is for version 5.x. The communication is done by default on port TCP 9300. This is the protocol used by the nodes of a cluster to communicate between them.
The REST API, because it is over HTTP, can be used by any client implementation (even “curl” at the command line) to send API requests.
Unless drastic changes in the API usage, the same API call will succeed whatever the version of ElasticSearch you run.
Of course, ElasticSearch may introduce changes in the API, in the call (new commands, removing some others, …) but the basic stays the same, so you client implementation is not needed to change if you change the ElasticSearch version.

Advantage / inconvenient of both way of talking to an ElasticSearch cluster

Rest API

+ not coupled to the ES version
+ need a light implementation (a library to do HTTP calls)
- You talk to only one node. And thus, this is the REST API client that needs to handle a node missing (detection, timeout management, switch to other nodes, …) so you have to statically mention all the address of the nodes you are allowed to connect to. And then you client will connect to one of the address which is responding.

Transport protocol

- coupled to the ES version
- client app is heavier (specific ES libraries)
+ client becomes member of the cluster so is made automatically aware of internal routing (where shards are located, where documents reside, …)
+ performances for large workload may be superior to the one the REST API, but don’t expect 2x faster response time. This is of the magnitude of 5-10%, compared to the same query in REST API. Even it is not different when the queries are returning few documents
+ can failover automatically between client when the cluster is made aware of one node missing