NiFi — Writing Processors

In this article, we look at Processors, one of the core concepts in reasonable depth. We look at the design of Processor framework and how to program new processors. The article assumes users to have some familiarity with NiFi and its UI.

What are Processors

A Processor can be thought of as a function that takes input and produces output and can take configuration. NiFi supports a standard list of processors which has wide coverage of connectors and functions. Additional set of processors are contributed by other open-source project teams and available for use without additional installation; examples include processors to work with AWS and Azure services. Still there may be a need to write our own set of processors.

Official Definition: Processors actually perform the work. A processor is doing some combination of data routing, transformation, or mediation between systems. Processors have access to attributes of a given FlowFile and its content stream. Processors can operate on zero or more FlowFiles in a given unit of work and either commit that work or rollback.

Processors are typically connected together in a DAG fashion to accomplish some objective such as a data pipeline. Same output from a processor can be sent to multiple processors and each pair of processors are connected by a queue.

Flow File

Messages that are exchanged between processors are referred as Flow Files. A Flow File flows from one processor to another via a queue. A Flow file has content (zero or more bytes) along with metadata which are referred as attributes (key-value pairs).

Sample flow file as seen in UI (Note Attributes tab and option to View content of Flow File)

Let’s look at each of these aspects of a processor in detail

  • Input
  • Output
  • Configuration
  • Processing

Input

The input to a processor can come from another processor upstream as Flow File.

Or the processor can read input from an external system such as a Messaging service (Kafka, Azure Event Hub, etc), Database, File Server, etc. Sample processor names include ConsumeKafka, ConsumeAzureEventHub.

Outputs

The output from a processor is represented as Flow File with content and metadata represented as attributes. A processor can output multiple Flow files in a single run.

Relationships

The output from a processor can be sent to one or more routes. For example, a ‘Message Validator’ processor on validation, can send valid messages to SUCCESS route a.k.a relationship and invalid messages to FAILURE relationship. The routes are named as relationships in NiFi.

Another example, say a system supports input messages in three different formats and would like to be handled by separate message handlers. In this case, a custom Processor can receive a Flow file, figure out which format/version it is and send to one of three relationships (v1, v2, v3). Message handlers could be designed as downstream processors and each handler handling one message format or version.

The relationships supported by a processor are declared in the Processor (to extend from AbstractProcessor) as a set of Relationship objects

Routing and processing of multiple version messages

Configuration

A processor needs configuration to be able to exercise its multiple behaviours or to connect to an external system. For example, to connect to Azure Event Hub messaging system, authentication and endpoint information are necessary; they are passed as processor properties.

The properties that are required or supported by a Processor are declared as PropertyDescriptors. A PropertyDescriptor object has fields including name, description, required and sensitive. The list of property descriptors that are coded in the processor are used by the web application to display properties

ConsumeAzureEventHub Processor Properties

The properties that are defined by the user for a Processor instance can be optionally validated by overriding customValidate() method of Processor ancestor AbstractConfigurableComponent

Processing

NiFi supports several life cycle operations. The lifecycle operations can be implemented by a Processor with appropriate annotations. Comprehensive list of annotations is documented here

  • onScheduled: called when scheduling a processor to run
  • onStopped: called when stopping a processor from UI
  • onShutDown: called when NiFi system is shutting down

A processor that is connected to another processor upstream can receive a FlowFile through this method. For example, EvaluateJsonPath receives Flow file from upstream and checks for expected JSON path in the Flow file JSON content.

A processor can also read messages from an external system such as Kafka can convert each message received into a FlowFile and pass on to one or more relationships to be consumed downstream. Examples include ConsumeKafka and ConsumeAzureEventHub

The method onTrigger on a Processor is invoked when there is work to do. This method has to be implemented by each processor. The onTrigger(ProcessContext, ProcessSession)method implementation makes use of ProcessSession to get hold of input Flow File. The configuration (properties) that is set on the UI is obtained from ProcessContext which provides a bridge between Processor and ‘NiFi framework’

Following are the steps to produce one or more Flow files from a Processor

  • Create flow file: With the use of ProcessSession.create
  • Add content to flow file: using Input, Output streams or Records using ProcessSession.write
  • Add attributes: using ProcessSession.writeAttribute
  • Transfer flow file to relationship: using ProcessSession.transfer
  • Commit session: using ProcessSession.commit

Logging during processing can be done using ComponentLog available to the Processor from getLogger() . This ensures logging from all Processors happen in a consistent way.

Typical implementation / skeleton of a Processor implementation:

class SampleProcessor {  Set<Relationship> getRelationships()  List<PropertyDescriptor> getSupportedPropertyDescriptors()  Collection<ValidationResult> customValidate()

public void onTrigger(...)
@OnScheduled
public void onScheduled()
@OnStopped
void close()
@OnUnscheduled
public void stopConnectionRetainer()

}

Wrapping Up

Believe this was a decent introduction to NiFi Processor and related concepts. This should give enough confidence to venture into writing new processors. I suggest looking at source code of a few processors to build familiarity and infer best practices.

Other References

Technology Enthusiast

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store