Data ingestion / Data pipelines

Definition

As you have seen, Big Data is not only about building a data store that can easily accommodate terabytes, petabytes or even more, but is also about the tools that you can use to explore, analyze and feed it.
In this section, we will focus on the ingestion layer or the data pipeline.

Data is everywhere in your organization, even outside of it sometimes. And you need to collect it and let it flow smoothly into your data store. Sometimes, when it is in transit, you may need to transform it a bit. This is why the Data Pipelines tools are about.
The operations that they are performing are often identified by the 3-letters acronym ETL. Term that became popular in the Data Warehousing world already some years ago and can be applied to the Big Data world too.
ETL stand for Extract-Transform-Load. In short, these 3 operations are covered by it:
  1. Extraction: when your source(s) are identified, you need to take the data there. Or your ETL tool will ask for it (pulling of the information) or your source will push it to some listener service managed by your tool.
  2. Transformation: this step, wich is not mandatory, is used to apply filtering, aggregation, format translation or any other technical or business rules on the data extracted during the step 1. These operations aims to get rid of meaningless data, to reduce the its footprint or to prepare it to be loaded into the source. Eg: extract meaningful JSON attributes from an entity and organize them into a CSV string; interpret some numeric codes present in the message, adding a human-readable field to the data; performing arithmetic conversion like aligning every size fields to the same unit value.
  3. Load: Once the data is extracted and transformed, it just needs to be loaded into the destination(s). Depending on the type of the target, the data may be transformed a last time (usually this job is done by the driver connecting to the target). A target can be a simple flat text file or a more complex one like a Kafka topic of a MongoDB collection.


The 3 tools

We have identified 3 main Open Source tools in the Big Data ecosystem to do this Job:

  1. The Apache Flume project
  2. The Apache NiFi project
  3. Logstash from the Elastic Stack

All these tools have in common:

  • You may define more than one source, not necessarily of the same kind.
  • At the source extraction, you can apply some tag / field / attributes that will allow you to differentiate the data further in your pipeline
  • You may define filtering (or transformations or interceptor) that will apply some rules of your data, using the provided ones. You can include conditional test (if ... then ... else ...) to apply some rules based on the data content or data source.
  • You will create different targets (or sinks) to write all the data, part of the data, only data matching some criteria, ... to it.
  • You can have sources that will fetch themselves the data by doing tail on files, reading a queue in some system, GET/POST towards some HTTP(S) server, ...
  • You can have sources that will only create a socket (TCP, UDP, Unix, ...) and wait until content is submitted to it by external senders (eg: you can create a Syslog listener that can be used by all your system as a remote Syslog server)

There are some notable differentiation between these 3 tools:

  • Apache NiFi is the only one build around a Web GUI where the whole configuration can be created or modified, but also where you can see the message flowing via some queue counters. You can open any queue and inspect any message waiting in it.
  • Apache NiFi, via its GUI, allows you to stop each process independently of the other. So you can inspect the queues and alter the connections between processes on the fly
  • Apache NiFi delivers also a sub-project called MiNiFi, which is a light version of NiFi that can be used to be run on the server where you're interested by the log, so putting NiFi closer to the source without creating huge impact on the host ressources
  • Logstash offers a very powerful and versatile search filter, called GROK, that can, in one line only, extract values from a string and create and populate new fields out of it. There are various pre-defined GROK filter delivered with Logstash, able to recognize and parse for value known log information like Syslog messages, Apache access log lines, ... You can immediately extract each meaningful token and populate a named field with it with only one line of configuration.
  • Next to Logstash, the Elastic Stack delivers also a lightweight framework called Beat. The Beats are lightweight data shipper usually targeting one kind of source, having very minimal filtering and target possibilities. Currently, we have Beats to get information from:
    • Log files (following log file like tail)
    • System resources (information about CPU, load, memory, disk space, network and processes usage
    • Dedicated application resources (Kafka, Apache, Mysql, MongoDB, ...
    • Information about the Icinga monitoring server (services & hosts states, notifications, acknowledgments, server status, cluster status, ...)


Features-comparison matrix for the 3 tools

Features Logstash 5.x  Flume 1.5.2 Flume 1.7.0       NiFi 1.2.0    
Sources Amazon Simple Queue
  AMQP messaging
  Avro
  AWS Kinesis
  Azure Event Hub
  Cassandra query
  CloudWatch
  CouchBase
  Custom source
  Drupal Dblog
  DynamoDB
  ElasticSearch
  Exec command
  File (read & del)
  File (tail)
  FlumeSources
  FTP
  Ganglia
  GELF (Graylog)
  GemFire
  Graphite
  HDFS
  Heroku
  HTTP (listen) 
  Ignite cache
  IMAP
  IRC
  JMS
  JMX
  Kafka 0.10
  Kafka 0.8
  Kafka 0.9
  Log4j Socket Appender
  LumberJack (listen)
  MongoDB
  MQTT
  POP3
  Puppet server
  RabbitMQ
  Raw socket (listen)
  Redis
  RELP (listen)
  RSS
  S3
  Salesforce
  Scribe
  Self-generator
  SFTP
  SMTP (listen)
  SNMP
  SolR
  Splunk
  SQL query
  STOMP
  Syslog (listen)
  Thrift
  Twitter
  Unix socket
  Varnish log
  Web server
  Windows event log
  WMI query
  XMPP
  Zenoss
  ZeroMQ
Targets Amazob Simple Queue
(sink) Amazon Kinesis
  Amazon Lambda
  Amazon S3
  Amazon Simple Notification
  AMQP messaging
  Avro
  Azure Event Hub
  Boundary
  Cassandra query
  Circonus
  CloudWatch
  CouchBase
  Custom targets
  DataDogHQ
  Disributed Map Cache
  DynamoDB
  ElasticSearch
  Email
  Flume Sinks
  FTP
  Ganglia
  Google BigQuery
  Google Cloud Storage
  Graphite
  Graylog GELF
  HBase
  HBase asynchrone
  HDFS
  HipChat
  Hive
  HTTP
  Ignite cache
  InfluxDB
  IRC
  JIRA
  JMS
  Juggernaut
  Kafka 0.10
  Kafka 0.8
  Kafka 0.9
  Kite Dataset
  Librato
  Local file
  Loggly
  LumberJack
  MetricCatcher
  MongoDB
  MQTT
  Nagios
  New Relic
  Null (discard)
  OpenTSDB
  Pipe
  RabbitMQ
  RackSpace Cloud Queue
  Redis
  Redmine
  Riak
  Riemann
  Send SQL
  Send to TCP / UDP
  SFTP
  Slack
  SolR
  Statd (for metrics)
  STOMP
  Syslog
  Thrift
  WebSocket
  XMPP
  Zabbix
  ZeroMQ
Filters Aggregation of content
(transformation) Alter content
(interceptor) Alter metadata (fields, attr.)
  Base64 encode
  Cassandra lookup
  Compress content
  Convert CSV -> Avro
  Convert JSON <-> Avro
  Convert JSON -> SQL
  Create HTML DOM
  DNS lookup
  Duplicate detection
  Encrypt
  Extract email headers
  Extract image headers
  Extract media headers
  GeoIP
  GROK extractor
  Hash value
  Insert hostname
  Insert static value
  Insert timestamps
  Insert UUID
  Jolt transformation on JSON
  Morphline
  Parse CEF
  Parse Syslog event
  Parse Windows Event
  Regex extractor
  Regex filter
  Replace with dictionary
  Search & replace
  Split of event
  Text routing
  Web UserAgent decode
  WHOIS query
  XSLT transform of XML
  Yandex translation