Last Updated: 2024–10-11
This is the second tutorial in the series of Python processor basics. In the previous lesson, Build a NiFi Python transform processor, you learned how to structure, implement, and deploy a custom FlowFileTransform processor. This tutorial will help you understand how to use logger and learn how to debug the most common issues.
In this tutorial, you will learn how to leverage the application logger to its fullest potential.
You will discover how to seamlessly access log entries in the Datavolo interface. Additionally, you will familiarize yourself with the most common issues and effective strategies for solving them, ensuring a smoother and more efficient workflow.
This tutorial is part of the final goal of creating a full ETL pipeline that loads the data to a database by the end of the series.
Once you've completed the tutorial, you will be able to:
Accessing logs is crucial for troubleshooting issues and monitoring the performance of your custom processors in a Datavolo runtime. This section will guide you through the steps to access and interpret these logs effectively.
The framework provides every instance of a processor with a logger
. It is important to understand that the logger instance provided is not a Python logger, but a proxy object communicating with a Java logger interface. Due to this, the available methods are different; most notably, Python's warning
log level is represented by the warn
method.
Log levels available are:
trace
debug
info
warn
error
Similarly to Python, log entries will only be created if they match or exceed the log level set as a threshold. More on this later.
Let's put what we learned in the previous section into practice. In the following example, you will write the comma-separated representation of the RETURN_SCHEMA
to the logs.
Add following line above contents = json.loads(flow_file.getContentsAsBytes())
.
self.logger.info(f"Return Schema: {', '.join(RETURN_SCHEMA)}")
Additionally, let's log the number of entries available in the states
list.
Add the following line below the contents
variable definition.
self.logger.debug(f"Length of states list: {len(contents['states'])}")
Before packaging and deploying the code, it is important to understand the version
parameter of a processor. This is another learning from our previous tutorial. In essence, every time a new version of a processor is created, a new environment is created for it, allowing for separation of environments and the dependencies.
Adding a new version of a processor will not automatically affect instances within the dataflow. They will need to be manually updated. With this in mind, you can update the version of the processor by incrementing the existing value in the ProcessorDetails
.
class ProcessorDetails:
version = '0.0.2-SNAPSHOT'
Similarly, you can update the version of the Hatch processor bundle. Information about the current version of the package can be found in the __about__.py
file, located in the processors/src/processors
directory.
Adjust the project version.
__version__ = "0.0.2"
The processor is now ready to be packaged and deployed. More information on these steps can be found in the previous tutorial.
If you follow the previous tutorial, you will have a dataflow that looks something like the one below.
Notice that simply uploading a new version of the processor does not update the existing flow.
With multiple versions of a processor available, Datavolo's UI offers a quick and efficient way to manage them. This feature allows users to easily switch between different versions, ensuring that the most suitable processor version is in use for their specific needs.
To upgrade the processor, select Change Version item from the processor's contextual menu.
All available snapshots are listed in the Version dropdown. Select version 0.0.2-SNAPSHOT and click Apply.
Time to clean up and remove the outdated build. Navigate to the Local Extensions tab of Controller Settings and delete the old package.
Created log entries can be accessed directly through the Datavolo UI. Initially, running the processor might not yield any log entries visible in the UI. This is because, by default, newly added processors have their Bulletin Level set to WARN, meaning that INFO and DEBUG entries will not be created.
You can change the Bulletin Level in the Settings tab of Edit Processor.
Update Bulletin Level to DEBUG for the processor.
Remember to Apply your changes.
Hovering over this bulletin indicator will render a new pop-up to the right listing the latest log entries.
Keep in mind that the bulletin indicator will also appear for the processor group.
Removing the package does not automatically remove associated processors. However, they will be surrounded by a dashed line border (often referred to as a ghost) which indicates the target NAR is missing.
You can verify this by hovering over the yellow triangle. You should see a Missing Processor error message.
Fortunately, Datavolo runtime restart sequence attempts to resolve any ghost processors that may be present. To initiate this, select Runtimes from the navigation drawer in the upper right of the UI.
Select Restart from the vertical ellipsis options that surface to the far right of your runtime.
This action refreshes the environment and can resolve issues related to processor updates or other runtime anomalies.
Once your runtime has been restarted, you can replace the old processor by following these steps.
You can verify that the new version of the processor is being used by checking its subtitle.
From Local Extensions you can confirm if a processor was successfully loaded. Click on the vertical ellipsis to the right of it and choose View Extension Types from the contextual menu.
In the Extension Types pop-up window you should see the name and version of the loaded processor.
If a new Hatch project was started using the hatch new
command, it is possible that the pyproject.toml
file is missing a tool.hatch.build.targets.nar
section.
When composing a NAR, the build command needs to be aware of the locations of resources that should be included in the final bundle. This information is represented as a list of locations.
Here is an example of a valid pyproject.toml
file.
[build-system]
requires = ["hatchling", "hatch-datavolo-nar"]
build-backend = "hatchling.build"
[project]
name = "processors"
dynamic = ["version"]
dependencies = []
[tool.hatch.version]
path = "src/processors/__about__.py"
[tool.hatch.build.targets.nar]
packages = ["src/processors"]
If your processor references an external package, ensure that it is listed in both the dependencies
parameter of the ProcessorDetails
class and the pyproject.toml
file.
Here is an example of how to configure a processor using the requests
library.
class Processor(FlowFileTransform):
class Java:
implements = ['org.apache.nifi.python.processor.FlowFileTransform']
class ProcessorDetails:
version = '0.0.1-SNAPSHOT'
description = '''
An example processor utilizing requests library.
'''
tags = ["example", "tutorial"]
dependencies = ["requests==2.32.3"]
Importantly, the same dependency should be listed in the pyproject.toml
file located in the Hatch package.
[project]
dependencies = [
"requests"
]
You can verify that the package was built with the dependency by inspecting the console output generated by the hatch build
command.
Congratulations, you've completed the Error handling in NiFi Python processors tutorial!
Check out some of these codelabs...