In part one of our series with Cardinal, learn to transform AWS VPC logs into the query-ready OCSF format. We'll build a streaming data pipeline using Fleak's OCSF Mapper, Zephflow, and Kinesis.

By
Gerrit Jansen van Vuuren
Fouding Engineer, Fleak
This is the first of a two-part series, a collaboration between Fleak and Cardinal, that shows how to transform AWS VPC logs into OCSF format. In this part, we’ll use Fleak’s OCSF Mapper and Zephflow workflow engine to build the conversion pipeline. In the second part, we'll show how to store the data as Parquet files in S3, making it ready for powerful observability and querying through the Cardinal platform.
We’re going to use AWS’s VPC logs as an example, but this process can be applied to any logs that come from any source. We’re using Kinesis as an input and output queue because it helps scale out when processing vast volumes of data, and it has a feature that can automatically convert data into parquet files and write to S3, taking care of the buffering, reliability, and timeouts. It’s not without its flaws, but it does the job. Note, you can also switch out Kinesis for Kafka.
Before we run anything, let’s see how we can map VPC logs into OCSF.
Creating an OCSF Transformation
In this section, we’ll focus on the log transformation itself. Our goal is to take an AWS VPC log sample and define a mapping between it and the OCSF network activity format. We’ll use Fleak’s OCSF Mapping tool to accomplish this task. It will guide us through the stages of identifying the correct OCSF class, setting up parsing logic, and then working through the mappings.
The mapping language used is Fleak’s own open-source Fleak Eval Expression Language (FEEL), and can be run using the open-source Zephflow workflow tool. Zephflow comes with some other goodies like running SQL—without a database—and using Python to process your logs.
The OCSF Mapping tool uses LLM models to help you decide what to choose and generates a fairly accurate mapping template for you. This, combined with some solid old-school validation, makes the tool a huge time saver when creating data mappings.
Before we can start, let’s look at a few commands to get a VPC log sample.
Getting a VPC log sample
First, make sure VPC Flow Logs are enabled for your target VPC and configured to publish to CloudWatch Logs.
To find your log group:
This should output a log group. If you do not see anything, please ask your administrator to enable VPC flow logging.
If you have VPC flow logging enabled, use the output from the command above and run the following command to get some sample logs:
You should get output similar to the following:
Note: remove the first two columns with date and eni-all, these will not be sent to Kinesis when using the Kinesis integration—thanks, AWS.
Here’s a sample from my VPC:
Now that we have our sample log, we’re ready to explore the Fleak OCSF Mapper.
Getting started with the OCSF Mapper
Open https://app.ocsf.fleak.ai/ in your browser, and follow the instructions to log in. Then click to create a new project. Once inside your project, click on the New Mapping button, and select the “text log” option.
When we set up a mapping, we first need to select the type of logs that we’ll receive. This primarily helps with configuring a parser. The parser is the first stage to get our log into JSON and assign column names to all fields. Once we can convert our input into a JSON object, we’re ready to define the mapping logic.
You should now see a text area where you can paste in the VPC log sample—each log input accepts a single log entry. You can also add additional context data, which helps the LLM understand the sample log. Go ahead and paste in the VPC log sample, and in context, data type “AWS VPC logs”. Then click on the “Continue to parser configuration” button.
In the parser configuration panel, select the type of log and fill in the log type-specific configuration. In our case, we’re using a space-separated log line. So select “delimited” and fill in the log delimiter with a single space. There’s no sane way for the software to know the columns, so you’ll need to specify the columns for each of the log fields. In our case, a simple Google search gives the following:
Click on the “Preview” button to check that the parser configuration is correct. If it’s working, you should see something like the image below:

Once you have the parser configuration complete, click on the “Analyse and map to OCSF” button. This will take a while, as it uses an LLM to find matches for the correct OCSF classes. Once it’s complete, you should see something like the image below: This shows the OCSF log type that matches the logs. You can click on it, and more options will appear.

Select the OCSF type and click the “Generate Mappings” button. During this process, we use an LLM to generate the initial mapping logic using Fleak’s eval language. The logic will try to map each of the input fields to a determined OCSF field. It may take up to 5 minutes—time for a coffee or tea. Once complete, you should see something like the image below:

Don’t be frightened, this is the mapping logic, and be glad you didn’t need to do each by hand. Click on the “Visual to Code” toggle, which will show a much nicer view, as shown below:

This view allows you to update the transformation logic and see how your log sample gets transformed in real time. If you feel lost, it’s a good idea to visit the FEEL documentation. Next, we’ll check and work through any failed transformations, where the LLM tried its best but found an error due to the input.
Updating and fixing mappings
In this section, we will work through a parsing issue that came up while doing this blog post. You may get others.
We’re going to write some eval language case statements. If you get lost, see: https://docs.fleak.ai/zephflow/feel-ref.
If any value is missing in the VPC log, AWS will put a -
in its place. This doesn’t work for fields that are expected to be numbers, like ports, and the parsing logic will fail when this happens.
For example, with dst_port.
To fix this, we’ll use a case statement to check for ‘-’ values. The Fleak eval language case statements look like:
So, in our case, the statement would be:
Here we test if the string is -
and return zero, otherwise we return the value.
Do the same change, everywhere parse_int is used. It’ll save you some parse errors once we start running this against real data.
Once you’re done, look through the Generated Output pane and check that all the fields make sense. There is also an Input/Output Logs tab where you can experiment with multiple logs and see how they map to OCSF.
A mapping on its own doesn’t do any good unless we can run it against some data and get the transformations out. Let’s take a look at that next.
From Mapping to Execution: Running Locally with Zephflow
Our mapping is made up of two configurations: one for parsing the input log and another for mapping the parsed data into OCSF. These configurations are run in the context of a command node. They are, in fact, the configuration for Zephflow’s Parser and Eval commands.
All Zepflow commands are run inside a workflow that specifies a source input, several commands, and a final sink output. The OCSF mapping app gives you a workflow that takes data from a file, threads it through the parser and eval commands, and then writes it to stdout. We’ll modify this workflow throughout this blog post series and update the source and source commands to user kinesis.
Let’s download the workflow and save it to a place where we can work with it. Click on the download button and select the “Download files” tab. Then click on the “Download Dag Definition Yaml” button to download the workflow. The file name will look something like this: dag-definition-4d85131d-8020-408f-bc5a-eacdefe7b7c1.yml
—rename it to vpc-ocsf.yml
.
Next, we’ll take a look at processing our sample VPC log locally.
Running your workflow
Before we jump into Kinesis and other things, let’s just give our workflow a spin. Copy the original VPC log sample you used into a file named vpclog.txt
While downloading the workflow, you would’ve noticed there were other tabs showing how to run the workflow. We’ll follow the tab explaining how to run the workflow from the CLI.
To start, pull the Zephflow Docker image:
Next, run the following Docker command in the same directory where you have the vpclog.txt and vpc-ocsf.yml workflow file.
If the mapping is done correctly, you should see some log output and then the JSON message representing the OCSF output.
We have a workflow that can take VPC log input and transform it into OCSF logs. But this workflow only runs on files; ideally, we can pull from Kinesis and write back to S3 so that further pipelines can use our OCSF logs.
Running a real-world transformation example
In this example, we’re going to use Terraform to configure AWS resources. If you want to use just the AWS CLI or the AWS console, feel free to do so. Our goal is to get VPC logs into Kinesis. Then read and transform the logs with Zephflow.
You’ll need AWS CLI credentials and permissions to create CloudWatch Logs and Kinesis Data streams. Let’s start with creating a Kinesis data stream.
Streaming VPC logs to Kinesis
First, you need to select a VPC. If you don’t have one, check with your account administrator first on how to create a test VPC.
The VPC log export to Kinesis works in two parts. First, we write to CloudWatch logs, and then we use a subscription to write from CloudWatch to Kinesis.
Before that, export the VPC and Kinesis names using the following commands:
Create the Kinesis data stream using the following command:
Wait for the stream to become active using:
Next, we’ll need to set up roles and permissions for writing to CloudWatch:
Create a role definition
We also need to create a policy with permissions to write to CloudWatch:
Creating a policy to write to CloudWatch
Set up a CloudWatch group.
Set up the VPC flow logs to the CloudWatch group
Now our VPC logs will be written into CloudWatch. Next, we want to export the CloudWatch logs automatically to Kinesis.
Start by creating a trust policy and iam role
Then, create a role policy giving access to Kinesis.
Lastly, let’s connect CloudWatch and Kinesis.
That’s it — VPC Flow Logs are now streaming into Kinesis in near real-time, with IAM roles and CloudWatch working together behind the scenes.
Transforming logs from Kinesis with Zephflow
In this section, we’ll modify our workflow to read from Kinesis. We’ll do so locally, but in production, you would want to wrap this up in a container or run it in an EC2 instance.
Zephflow workflows have an input—source—and an output—sink. There are multiple implementations for these, and they are available to the workflow as nodes. We are going to use the “kinesissource” command and provide it with the necessary configuration in our workflow.
For our example, we’ll assume the data stream name is “vpc_flow_logs”. Please change this to the name of the Kinesis stream you configured earlier.
If you look through the workflow, you should find a node with commandName: “stdin”. Replace this with the following configuration:
The config is a bit of a pain, but it’s passed as a string to the nodes, so needs to be wrapped and escaped. Take a moment and make sure the streamName and regionStr parameters are correct. You can change the applicationName to what you want.
Let’s take a quick look at what the other parameters are.
compressionTypes: AWS VPC logs are sent gzip compressed. This part tells Zephflow that it should decompress the data it receives from Kinesis.
encodingType: This part tells Zephflow that the data is a JSON object, after it’s been decompressed.
Now, if you plug this into the workflow and run it as is, it wouldn’t do much and probably fail. Our parser is expecting a JSON message with a field where it can find the VPC log string. But AWS sends us the VPC logs wrapped in an array.
We’ll use the SQL node to unwrap the message array.
Add the following node to your workflow:
This uses a SQL node to run SQL that unwraps the message. Input data is available as an events table, and the json_array_elements command is similar to its Postgres counterpart. Note: We don't need to run a database here. The SQL is interpreted and executed by Zephflow. The json_array_element will read the events.logevents field and return a row for each message. Inside each message, there’s a message field, which we select.
Next, update the kinesissource node to output to the sql node like this:
When our workflow runs now, it will read data from Kinesis, then process it with the sql node—which unwraps the messages—and pass it one by one to the parser_0 node.
Check that the parser node’s targetField is “message”. This is the field where the vpc logs are.
The parser node also contains the parser config with delimited text and the columns for each field. The output for the parser_0 should point to the eval node, and finally, the eval node will output to the stdout node, which just prints the transformed messages.
Let’s see how we can run our workflow with Python to avoid authentication issues when running inside Docker locally. If you’re ok to run your workflow via Docker and it picks up your AWS credentials correctly, then go for it; otherwise, read on to the next section.
Running our workflow with Python
We’ll use Zephflow’s Python SDK to run our workflow locally and read from the Kinesis data stream we created. You will need Python 3 for this.
To install the SDK run the following command:
Then create the following script—name it vpc_ocsf_transformer.py
:
This will execute the YAML workflow—make sure it exists in the same directory as the Python script. Then run it using:
You may see an error about EC2 instance metadata; you can ignore that. Then you should see the Kinesis consumer library ramping up and lots of output. Kinesis takes a moment, and if this is the first time, it will create a DynamoDB table. Waiting for the table to become active and all the things the AWS KCL library needs will take a while.
Once the resources are created, you should see either error messages or your transformation output.
Nothing is ever easy, and reading from data pipelines can take time. Be patient and ask your AWS administrator—or a friend if you're the admin—for help. Once your workflow is running and you can see the data being read and processed, you’re ready to move on to the second part of this series. But first, let’s look back at what we achieved.
Recap and What's Next
In this post, we’ve explored how to use the Fleak OCSF Mapping app to create mappings. The app helps with identifying the correct OCSF class type, setting up parsing logic, and creating mappings from our source log into OCSF. Almost all of the mappings can be used as is, and there were a few places where we had to add more logic. But all in all, you can create an OCSF mapping in a matter of minutes. Imagine if you had to do this by hand…
Once we had our mapping, we walked through the steps of running it using Docker. Then we dived into the main theme of this blog: reading VPC logs. It’s not just enough to map the logs; you also have to read them from somewhere. So we looked at how to get VPC logs into Kinesis and then how to run our Zephflow workflow to read from Kinesis and output to stdout.
In the next part, we’ll explore how to set up a destination that receives our mappings and writes the OCSF data into S3 as Parquet files. This will prepare the data for the final step: enabling powerful, real-time observability and analytics with Cardinal.
Other Posts
Jul 2, 2025
The OWASP LLM Top 10 for 2025: A Practical Security Guide for Engineering Teams
The OWASP 2025 LLM Top 10 is here, targeting real-world attacks. Our guide for engineers breaks down new threats like Vector Security and Prompt Leakage, offering practical tips to secure your LLM apps from sophisticated exploits.
Jun 15, 2025
OCSF to S3: Streaming with Kinesis, Firehose, and Zephflow
In Part 2, we build the final stage of our pipeline. Learn to stream OCSF logs to S3 as Parquet using Kinesis Firehose, a Glue schema, and a Zephflow sink, making your data ready for large-scale analysis.
Jun 13, 2025
From VPC Logs to OCSF: A Streaming Pipeline with Kinesis and Zephflow
In part one of our series with Cardinal, learn to transform AWS VPC logs into the query-ready OCSF format. We'll build a streaming data pipeline using Fleak's OCSF Mapper, Zephflow, and Kinesis.