Last Updated: 2024-09-04
Datavolo offers rich templates of common AI data engineering use cases in our Showcase Flows that are ready to customize and run quickly. This tutorial walks data engineers step-by-step through the process of building an AI pipeline (albeit a simple one) and provides a RAG app test harness to validate the ETL job with. It also helps familiarize those without extensive NiFi flow building experience gain confidence developing in Datavolo.
In this tutorial, you will construct a complete AI data pipeline using Datavolo Cloud and validate the results of it with a provided RAG application test harness.
Once you've completed this tutorial, you will be able to:
For this tutorial, you will construct an AI data engineering pipeline based on personal diary entries. You will also leverage a supplied RAG application test harness to validate that LLM results were improved for relevant questions.
The test data is available here. It does NOT need to be downloaded, but you can review it if desired. The following is an excerpt of what the data file looks like.
The following image was originally presented in Understanding RAG. The ETL Pipeline for AI will be stubbed out for you to complete during this tutorial. A functioning (albeit simple) RAG App test harness will be provided to validate the benefit of the data pipeline. OpenAI will be used for the LLM and Pinecone will be the vector database for our tutorial.
Log into Datavolo Cloud and access (create if necessary) a Runtime.
Download the Exploring_RAG.json
flow definition file and then import it onto the top-level NiFi canvas.
After importing the template, you should see a process group named Exploring_RAG on your canvas. Enter the process group. If you zoom all the way out, the contents of this new process group will look similar to the following.
The yellow area is highlighting the RAG app test harness and the green identifies the ETL pipeline that is stubbed out for you. Both of these flows will be explored in this tutorial.
You will be using OpenAI for creating embeddings and for LLM prompts. The Pinecone vector database will be used to store & search embeddings. This step focuses on obtaining access keys to these 3rd party systems that will be used in later steps.
Visit https://platform.openai.com/api-keys to login or sign up. Once you get to the Dashboard, click API keys on the left nav. You will see multiple ways to Create new secret key.
Create new secret key that is Owned by You. Set the Name to Tutorials
and leave Project and Permissions in their default setting before clicking on Create secret key.
Be sure to Save your key somewhere that you can access it in a subsequent step.
Visit https://www.pinecone.io to login or sign up. You will likely be asked to answer a few questions to personalize your Pinecone experience before you can Complete setup. Next, you will be presented with a Welcome to your Default project page that has the Create index option pre-selected like shown below.
You have multiple ways to create the index. For simplicity, this tutorial is presenting how to perform this task with a curl
command. To continue with that approach, select Shell in the language pulldown.
Copy and paste the presented lines into an editor of your choice.
Navigate to API keys in the left nav to find the Default API keys list. Copy the default key to your clipboard and save it along with your previous API key for use it next, and in future steps.
In your editor, make the following changes to the curl
command by the line numbers below.
YOUR_API_KEY
with the Pinecone API key you just saved.quickstart
with diary
.8
with 1536
and remove the comments starting at the #
sign.euclidean
with cosine
and remove the comments.With exception of the API key, you can use this updated curl
command when you execute it from the command line.
curl -s -X POST "https://api.pinecone.io/indexes"\
-H "Accept: application/json"\
-H "Content-Type: application/json"\
-H "Api-Key: abc123abc123_use_YOUR_key_here"\
-d '{
"name": "diary",
"dimension": 1536,
"metric": "cosine",
"spec": {
"serverless": {
"cloud": "aws",
"region": "us-east-1"
}
}
}
To help you focus on the task at hand, the template includes an appropriate Parameter Context and the necessary Controller Services. To initialize them, ensure you are inside the Exploring_RAG process group.
Right click on an empty area on the canvas inside the Exploring_RAG process group and select Parameters from the contextual menu. You should see the Edit Parameter Context window.
Clicking on the vertical ellipsis and Edit will allow you to set your API key values for the OpenAIKey and PineconeAPIKey parameters. Leave PineconeIndex set to diary
unless your configuration is different.
After updating these 2 values, click on Apply and then Close. Click Back to Process Group to return to the canvas inside the Exploring_RAG process group. These parameters will be referenced in future steps of this tutorial.
Right click on an empty area on the canvas inside the Exploring_RAG process group and select Controller Services from the contextual menu which will render the list of Controller Services. For each of the 3 present, click on the vertical ellipsis and then Enable.
For each Enable Controller Service window, click on Enable and then Close. Verify that all 3 are reporting a STATE of Enabled then go Back to Process Group.
These controller services will be referenced in future steps of this tutorial.
Ensure your navigation breadcrumbs indicate you are inside the Exploring_RAG process group.
Enter the process group named 1) Acquisition where the activities for this step will be completed.
The Get Diary processor has already been created for you. It is of type InvokeHTTP
and simply retrieves the contents of the file referenced in the HTTP URL property.
For this tutorial we will only ingest a single file. Right click on Get Diary and select Run Once.
Notice that the Out metric shows 1 Flow File has been emitted, the Response connection is queued with that same 1 FlowFile (since the Journal Text output port is not running), and that the Get Diary processor is also in a Stopped state.
Right click on the Response connection and select List Queue. On the Response window that surfaces, click on the vertical ellipsis on the far right of the single entry of this list to select View Content.
Verify that you see the content of the file that was ingested into Datavolo. Close this newly opened tab.
Again, this is a simple example for this tutorial. The unstructured data file could be a much more complex document type than plain text and there will very likely be many files to ingest.
Right click on the Journal Text output port and select Start. This will allow the Response queue to empty.
Click on Exploring_RAG in the lower left corner's breadcrumbs to return to that process group and notice the FlowFile is queued up just outside the next processor group.
Ensure your navigation breadcrumbs indicate you are inside the Exploring_RAG process group.
Enter the process group named 2) Parsing where the activities for this step will be completed.
You will see this process group only has an Input Port and an Output Port. This is where you would use one, or more, appropriate processors to parse the data. For example, if the document was a PDF you would add a ParsePdfDocument
processor.
In this tutorial, we are ingesting a simple text file and have a much less complex task. If interested, watch the following video of what this effort would look like for a PDF full of text, images, and tables.
Additional tutorials will address advanced document parsing such as shown above. In this tutorial we will continue with our text file.
In the parsing activity one of the major efforts is to create a hierarchy within the input document. If you remember, our input file is a series of diary notes that are annotated with **journal-item
at the beginning of each note. This primitive markup offers us a way to visualize the document parsed into the journal entries themselves and does not require additional parsing efforts.
Given the document is already decorated with its own flat hierarchy that can be leveraged in subsequent steps, let's make sure it is being identified as a simple text file.
Add an UpdateAttribute processor and name it Tag As Text
. Add a custom property named mime.type and set the value to text/plain
.
Start the Input Port and the new processor. Verify that the success connection has the same FlowFile from before. List Queue and then View Details (instead of View content as done in the prior step). Change to the Attributes tab of the FlowFile window and scroll down until you verify the attribute named mime.type is present.
Start the Output Port to allow the success queue to empty..
Click on Exploring_RAG in the lower left corner's breadcrumbs to return to that process group and notice the Flow File is queued up just outside the next processor group.
You might have noticed already that we have 2 separate process groups (one for ingestion and another for parsing) and that both just have a single processor in them. If you noticed that, you're very likely asking "do I really need all these process groups?" Well, as the consultant always says... "it depends".
One train of thought would be this is a good way to abstract away the implementations of each of these steps so they can be developed and managed independently. In fact, in a more realistic AI data pipeline both of these steps would have very likely taken more than a single processor to accomplish all the relevant activities.
This would be very useful should you want to swap out the implementation for another in any of the process groups (PGs) on the main flow. For this tutorial, there is an added benefit of letting you see where we are going.
But... for this specific tutorial, these PGs purposefully will be implemented with a single, or a small number of, processors. Therefore, you could refactor the flow along the way to be flat instead of nested inside these chained PGs.
If you decide to go down that path you can see the altered version on the right in the following screenshot.
Even though it might feel like over-engineering, the rest of the tutorial will continue down the path of using a PG for each logical section of the pipeline.
Ensure your navigation breadcrumbs indicate you are inside the Exploring_RAG process group.
Enter the process group named 3) Clean & Augment where the activities for this step will be completed.
As before, you will see this process group is missing an implementation and only has an Input Port and an Output Port. As you can read in our Solution Walkthrough for Advanced Document Processing, these are the classic transformation activities that all ETL and ELT jobs are responsible for.
What you need to implement depends heavily on the quality of the data you have and the opportunities that exist to enrich the data. This portion of the data pipeline could also leverage an LLM for cleaning and augmenting as part of the transformation logic.
For this tutorial's use case, we can simply remove any duplicate white space in the input and then strip out any blank lines. As with NiFi's approach, this means chaining together multiple processors each doing something specific.
Add a ReplaceText
processor to the canvas and name it Remove Extraneous White Space
. Leave Replacement Strategy set to Regex Replace and update Search Value to (\s)+
which "simply" means finding any two blank spaces together.
Auto-terminate the failure relationship. You will wire up the success relationship, which will have the transformed data, shortly.
Add a RouteText
processor and name it Strip Blank Lines
. In the Properties tab, select Matches Regular Expression from the Matching Strategy pulldown menu. Add a custom property named Empty Line and set the value to ^$
which indicates an empty line.
Because the Routing Strategy is set to Route to each matching Property Name, any blank line in the input data will be removed and routed down a new relationship with the same name as the custom property; Empty Line. The remainder of the lines will fall into the unmatched relationship.
Mark the Empty Line and original relationships as auto-terminated.
Connect the Input Port to Remove Extraneous White Space. Make a connection for the success relationship of Remove Extraneous White Space to Strip Blank Lines. Make another connection for the unmatched relationship of Strip Blank Lines to the Output Port. Your flow should look similar to this.
Start the Input Port and notice the same FlowFile from before in the queue connected to Remove Extraneous White Space. View content for this FlowFile and scroll down in the text until you find the second entry for January 15th. Notice that it includes multiple occurrences of extra white space.
Start Remote Extraneous White Space and View content on the FlowFile that is queued into the success relationship. Scroll down to the second entry for January 15th entry and notice the extra white spaces have been removed.
Notice there are blank lines between all of the entries.
Start Strip Blank Lines and View content on the FlowFile that is queued into the unmatched relationship. Notice the blank lines have been removed.
Start the Output Port to allow the FlowFile to leave this PG and queue up before the next PG inside of Exploring_RAG.
Ensure your navigation breadcrumbs indicate you are inside the Exploring_RAG process group. Enter the process group named 4) Chunking where the activities for this step will be completed.
Understanding RAG identified chunks as the following.
LLMs are inputted with a massive amount of textual data. Much like how we as humans tackle problems by breaking them down into smaller pieces, the training process creates chunks of this input data. For simple understanding, think how a book is broken down into chapters which are then broken down into paragraphs.
For this transformation step in the pipeline, input data from the diary file needs to be chunked.
Add a ChunkText
processor in this PG, name it Chunk into individual entries
, and auto-terminate the original relationship. Make a connection from the Input Port into this processor. Create a connection for the success relationship of this processor to the Output Port. Your flow should look similar to the following.
In the new processor's Properties tab, notice that there are multiple Chunking Strategy options. More details on these choices can be found in the ChunkText
documentation. Leave the default value of Recursive Delimiters and change Max Chunk Length to 350
to more align with the size of the journal entries and change the Chunk Delimiters to be **journal-item
.
Start the Input Port and the new processor to see that 26 chunks have been created from the single input document. List Queue to see them all. Notice that none of them represent content that is larger than the 350 bytes as controlled by the Max Chunk Length property.
View content on some of these FlowFiles and you will notice that most are complete journal entries such as the third FlowFile in the list.
Start the Output Port to allow the chunked FlowFiles to leave this PG and queue up before the next PG inside of Exploring_RAG.
Ensure your navigation breadcrumbs indicate you are inside the Exploring_RAG process group. Enter the process group named 5) Create embeddings where the activities for this step will be completed.
Understanding RAG identified embeddings as the following.
Computers don't really speak languages such as English and Spanish, but they can create mathematical representations of those chunks. They can also determine appropriate associations between these "embeddings" that are calculated. I think our minds do something similar, finding embeddings of memories that are closely related to the topic we are thinking of. It is no wonder they call this a neural network..
The chunks of text created in the prior step need to be transformed into embeddings.
Add a CreateOpenAiEmbeddings
processor in this PG and leave the default name that is created as it is descriptive enough. Auto-terminate the failure relationship. Make a connection from the Input Port into this processor. Create a connection for the success relationship of this processor to the Output Port.
Hover over the yellow invalid indicator of the processor to see that 3 required properties still need to be set.
For the Properties of this processor, select the only options available in the pulldown menus for Record Writer and Web Client Service. These are 2 of the Controller Services the flow definition template file provided and that you Enabled earlier in this tutorial. For OpenAI API Key, type in #{OpenAIKey}
which will use the sensitive value you set in the shared Parameter Context earlier in this tutorial.
Start the Input Port and the new processor to see that the prior 26 chunks have all been transformed into embeddings.
List Queue to see them all. View content for a random FlowFile to see the start of the embedding itself as well as the text that was used to create the embeddings representation..
Start the Output Port to allow the embeddings to leave this PG and queue up before the next PG inside of Exploring_RAG.
Ensure your navigation breadcrumbs indicate you are inside the Exploring_RAG process group. Enter the process group named 6) Store embeddings where the activities for this step will be completed.
Storing embeddings simply means we need to persist them into a vector database that is accessible by the AI apps so they can benefit from the data pipeline we have been constructing.
Add an UpsertPinecone
processor in this PG and leave the default name that is created as it is descriptive enough. Make a connection from the Input Port into this processor and start the Input Port so the FlowFiles will queue up. Since this is the end of the flow, we won't have a connection to route FlowFiles to. To help us have some observability in our tutorial testing, route the relationships to different funnels as you see below.
Hover over the yellow invalid indicator of the processor to list the required properties that still need to be set.
See if you can set these Properties on your own. Remember, you can find the param names by right clicking an empty part of the canvas and selecting Parameters. If needed, refer back to the prior step where you did something very similar.
You will also need to set these two properties to the values shown. This will allow the vector database records to include the original text along with the actual embeddings values.
Once the new processor is valid, start it and verify that all 26 embeddings were stored in the Pinecone vector database by seeing 26 FlowFiles in the success queue. After verifying they are present, trigger the Empty Queue functionality as they are no longer needed.
As discussed in the Do we need all these PGs? step... no we don't. Again, some very good reasons you might choose to do so. That aside, this very simple ETL flow could be flattened to fit nicely within the Exploring_RAG process group.
The logical flow from this diagram is detailed in Understanding RAG.
Within the Exploring_RAG process group there is a flow that is sitting inside a yellow label. This flow is the very simple test harness which acts as a primitive RAG application. Right click on the AI Test Harness PG and select Start. Ensure that this PG shows 10 running processors as highlighted below.
Open the Properties of the Submit Prompt processor at the top of the flow. Notice that the value of Custom Text is initially set to the first question listed in the ** SAMPLE PROMPTS part of the label above.
To run the test harness with this question, right click on Submit Prompt and select Run Once – do NOT select Start. There should now be one FlowFile in each of the NO aug and RAG app queues (and the Submit Prompt should be shown as stopped).
View Content for both of the outputted FlowFiles and the answers received should be similar to the following.
As you can see, the LLM request that only uses the publicly available information on The Flintstones does not come back with anything definitive about Fred's interest in snail mail. In fact, it seems to ramble on to justify why it doesn't have a valid answer. It even formally states that "mail is not a central theme" for Fred.
Conversely, the augmented LLM request includes journal entries related to his enjoyment for sending & receiving postcards and therefore able to provide a more definitive, and succinct, response to the question is returned.
Follow the same process of updating the Custom Text and triggering Run Once on Submit Prompt to try additional questions and review the results. Here are examples of responses to the remaining ** SAMPLE PROMPTS above, but feel free to create your own.
How does Fred Flintstone feel about public transportation?
Is Fred Flintstone someone who frequently journals?
How often does Fred Flintstone go to the movies?
Review the diary entries processed in the data pipeline to see if you feel the RAG app test harness is working well with these sample questions and those that you have come up with on your own.
As Datavolo is not necessarily marketed as a product to generate AI applications, the RAG app test harness can be treated as a simple black box testing tool.
You are encouraged to review the flow within the AI Test Harness process group. You might even decide it is something you could reuse on your AI data engineering journey with Datavolo!
Congratulations, you've completed the Exploring RAG tutorial!
Check out some of these codelabs...