NiFi — Writing Processors
In this article, we look at Processors, one of the core concepts in reasonable depth. We look at the design of Processor framework and how to program new processors. The article assumes users to have some familiarity with NiFi and its UI.
What are Processors
A Processor can be thought of as a function that takes input and produces output and can take configuration. NiFi supports a standard list of processors which has wide coverage of connectors and functions. Additional set of processors are contributed by other open-source project teams and available for use without additional installation; examples include processors to work with AWS and Azure services. Still there may be a need to write our own set of processors.
Official Definition: Processors actually perform the work. A processor is doing some combination of data routing, transformation, or mediation between systems. Processors have access to attributes of a given FlowFile and its content stream. Processors can operate on zero or more FlowFiles in a given unit of work and either commit that work or rollback.
Processors are typically connected together in a DAG fashion to accomplish some objective such as a data pipeline. Same output from a processor can be sent to multiple processors and each pair of processors are connected by a queue.
Flow File
Messages that are exchanged between processors are referred as Flow Files. A Flow File flows from one processor to another via a queue. A Flow file has content (zero or more bytes) along with metadata which are referred as attributes (key-value pairs).
Let’s look at each of these aspects of a processor in detail
- Input
- Output
- Configuration
- Processing
Input
The input to a processor can come from another processor upstream as Flow File.
Or the processor can read input from an external system such as a Messaging service (Kafka, Azure Event Hub, etc), Database, File Server, etc. Sample processor names include ConsumeKafka, ConsumeAzureEventHub.
Outputs
The output from a processor is represented as Flow File with content and metadata represented as attributes. A processor can output multiple Flow files in a single run.
Relationships
The output from a processor can be sent to one or more routes. For example, a ‘Message Validator’ processor on validation, can send valid messages to SUCCESS
route a.k.a relationship and invalid messages to FAILURE
relationship. The routes are named as relationships in NiFi.
Another example, say a system supports input messages in three different formats and would like to be handled by separate message handlers. In this case, a custom Processor can receive a Flow file, figure out which format/version it is and send to one of three relationships (v1, v2, v3). Message handlers could be designed as downstream processors and each handler handling one message format or version.
The relationships supported by a processor are declared in the Processor (to extend from AbstractProcessor
) as a set of Relationship
objects
Configuration
A processor needs configuration to be able to exercise its multiple behaviours or to connect to an external system. For example, to connect to Azure Event Hub messaging system, authentication and endpoint information are necessary; they are passed as processor properties.
The properties that are required or supported by a Processor are declared as PropertyDescriptor
s. A PropertyDescriptor object has fields including name, description, required and sensitive. The list of property descriptors that are coded in the processor are used by the web application to display properties
The properties that are defined by the user for a Processor instance can be optionally validated by overriding customValidate()
method of Processor ancestor AbstractConfigurableComponent
Processing
NiFi supports several life cycle operations. The lifecycle operations can be implemented by a Processor with appropriate annotations. Comprehensive list of annotations is documented here
onScheduled
: called when scheduling a processor to runonStopped
: called when stopping a processor from UIonShutDown
: called when NiFi system is shutting down
A processor that is connected to another processor upstream can receive a FlowFile
through this method. For example, EvaluateJsonPath
receives Flow file from upstream and checks for expected JSON path in the Flow file JSON content.
A processor can also read messages from an external system such as Kafka can convert each message received into a FlowFile and pass on to one or more relationships to be consumed downstream. Examples include ConsumeKafka and ConsumeAzureEventHub
The method onTrigger
on a Processor is invoked when there is work to do. This method has to be implemented by each processor. The onTrigger(ProcessContext, ProcessSession)
method implementation makes use of ProcessSession
to get hold of input Flow File. The configuration (properties) that is set on the UI is obtained from ProcessContext
which provides a bridge between Processor
and ‘NiFi framework’
Following are the steps to produce one or more Flow files from a Processor
- Create flow file: With the use of
ProcessSession.create
- Add content to flow file: using Input, Output streams or Records using
ProcessSession.write
- Add attributes: using
ProcessSession.writeAttribute
- Transfer flow file to relationship: using
ProcessSession.transfer
- Commit session: using
ProcessSession.commit
Logging during processing can be done using ComponentLog
available to the Processor from getLogger()
. This ensures logging from all Processors happen in a consistent way.
Typical implementation / skeleton of a Processor implementation:
class SampleProcessor { Set<Relationship> getRelationships() List<PropertyDescriptor> getSupportedPropertyDescriptors() Collection<ValidationResult> customValidate()
public void onTrigger(...) @OnScheduled
public void onScheduled() @OnStopped
void close() @OnUnscheduled
public void stopConnectionRetainer()
}
Wrapping Up
Believe this was a decent introduction to NiFi Processor and related concepts. This should give enough confidence to venture into writing new processors. I suggest looking at source code of a few processors to build familiarity and infer best practices.
Other References