Last Updated: 2024–10-11

Background

This is the second tutorial in the series of Python processor basics. In the previous lesson, Build a NiFi Python transform processor, you learned how to structure, implement, and deploy a custom FlowFileTransform processor. This tutorial will help you understand how to use logger and learn how to debug the most common issues.

Scope of the tutorial

In this tutorial, you will learn how to leverage the application logger to its fullest potential.

You will discover how to seamlessly access log entries in the Datavolo interface. Additionally, you will familiarize yourself with the most common issues and effective strategies for solving them, ensuring a smoother and more efficient workflow.

This tutorial is part of the final goal of creating a full ETL pipeline that loads the data to a database by the end of the series.

Learning objectives

Once you've completed the tutorial, you will be able to:

Prerequisites

Accessing logs is crucial for troubleshooting issues and monitoring the performance of your custom processors in a Datavolo runtime. This section will guide you through the steps to access and interpret these logs effectively.

Understanding logger

The framework provides every instance of a processor with a logger. It is important to understand that the logger instance provided is not a Python logger, but a proxy object communicating with a Java logger interface. Due to this, the available methods are different; most notably, Python's warning log level is represented by the warn method.

Log levels available are:

Similarly to Python, log entries will only be created if they match or exceed the log level set as a threshold. More on this later.

Writing to logs

Let's put what we learned in the previous section into practice. In the following example, you will write the comma-separated representation of the RETURN_SCHEMA to the logs.

Add following line above contents = json.loads(flow_file.getContentsAsBytes()).

self.logger.info(f"Return Schema: {', '.join(RETURN_SCHEMA)}")

Additionally, let's log the number of entries available in the states list.

Add the following line below the contents variable definition.

self.logger.debug(f"Length of states list: {len(contents['states'])}")

Versioning the processor

Before packaging and deploying the code, it is important to understand the version parameter of a processor. This is another learning from our previous tutorial. In essence, every time a new version of a processor is created, a new environment is created for it, allowing for separation of environments and the dependencies.

Adding a new version of a processor will not automatically affect instances within the dataflow. They will need to be manually updated. With this in mind, you can update the version of the processor by incrementing the existing value in the ProcessorDetails.

    class ProcessorDetails:
        version = '0.0.2-SNAPSHOT'

Versioning the package

Similarly, you can update the version of the Hatch processor bundle. Information about the current version of the package can be found in the __about__.py file, located in the processors/src/processors directory.

Adjust the project version.

__version__ = "0.0.2"

The processor is now ready to be packaged and deployed. More information on these steps can be found in the previous tutorial.

If you follow the previous tutorial, you will have a dataflow that looks something like the one below.

Notice that simply uploading a new version of the processor does not update the existing flow.

Change Version

With multiple versions of a processor available, Datavolo's UI offers a quick and efficient way to manage them. This feature allows users to easily switch between different versions, ensuring that the most suitable processor version is in use for their specific needs.

To upgrade the processor, select Change Version item from the processor's contextual menu.

All available snapshots are listed in the Version dropdown. Select version 0.0.2-SNAPSHOT and click Apply.

Remove old processor

Time to clean up and remove the outdated build. Navigate to the Local Extensions tab of Controller Settings and delete the old package.

Created log entries can be accessed directly through the Datavolo UI. Initially, running the processor might not yield any log entries visible in the UI. This is because, by default, newly added processors have their Bulletin Level set to WARN, meaning that INFO and DEBUG entries will not be created.

Adjusting log level

You can change the Bulletin Level in the Settings tab of Edit Processor.

Update Bulletin Level to DEBUG for the processor.

Remember to Apply your changes.

Locating log entries

Hovering over this bulletin indicator will render a new pop-up to the right listing the latest log entries.

Keep in mind that the bulletin indicator will also appear for the processor group.

Removing the package does not automatically remove associated processors. However, they will be surrounded by a dashed line border (often referred to as a ghost) which indicates the target NAR is missing.

You can verify this by hovering over the yellow triangle. You should see a Missing Processor error message.

Restart the runtime

Fortunately, Datavolo runtime restart sequence attempts to resolve any ghost processors that may be present. To initiate this, select Runtimes from the navigation drawer in the upper right of the UI.

Select Restart from the vertical ellipsis options that surface to the far right of your runtime.

This action refreshes the environment and can resolve issues related to processor updates or other runtime anomalies.

Manually upgrade the processor

Once your runtime has been restarted, you can replace the old processor by following these steps.

  1. Right click on each incoming and outgoing queue to first Empty queue and then Delete.
  2. Right click on the affected processor and select Delete.
  3. Add a new processor from the component toolbar with the correct version number.
  4. Recreate the connections that were previously removed.
  5. If needed, auto-terminate any relationships as desired.
  6. Modify any needed properties (to include setting them to any appropriate Parameter Context parameters).

You can verify that the new version of the processor is being used by checking its subtitle.

Checking if the processor was loaded

From Local Extensions you can confirm if a processor was successfully loaded. Click on the vertical ellipsis to the right of it and choose View Extension Types from the contextual menu.

In the Extension Types pop-up window you should see the name and version of the loaded processor.

Possible solution: Missing build target

If a new Hatch project was started using the hatch new command, it is possible that the pyproject.toml file is missing a tool.hatch.build.targets.nar section.

When composing a NAR, the build command needs to be aware of the locations of resources that should be included in the final bundle. This information is represented as a list of locations.

Here is an example of a valid pyproject.toml file.

[build-system]
requires = ["hatchling", "hatch-datavolo-nar"]
build-backend = "hatchling.build"

[project]
name = "processors"
dynamic = ["version"]
dependencies = []

[tool.hatch.version]
path = "src/processors/__about__.py"

[tool.hatch.build.targets.nar]
packages = ["src/processors"]

Possible solution: Missing dependency

If your processor references an external package, ensure that it is listed in both the dependencies parameter of the ProcessorDetails class and the pyproject.toml file.

Here is an example of how to configure a processor using the requests library.

class Processor(FlowFileTransform):

    class Java:
        implements = ['org.apache.nifi.python.processor.FlowFileTransform']

    class ProcessorDetails:
        version = '0.0.1-SNAPSHOT'
        description = '''
        An example processor utilizing requests library.
        '''
        tags = ["example", "tutorial"]
        dependencies = ["requests==2.32.3"]

Importantly, the same dependency should be listed in the pyproject.toml file located in the Hatch package.

[project]
dependencies = [
  "requests"
]

You can verify that the package was built with the dependency by inspecting the console output generated by the hatch build command.

Congratulations, you've completed the Error handling in NiFi Python processors tutorial!

What you learned

What's next?

Check out some of these codelabs...

Further reading

Reference docs