A Step-by-Step Guide: From Okta Logs to OCSF in Databricks

A Step-by-Step Guide: From Okta Logs to OCSF in Databricks

Transform Okta logs to OCSF for analysis in Databricks. This guide covers automated mapping with Fleak, building a data pipeline with ZephFlow, and querying your logs in a Delta Lake. Simplify your security analytics workflow now.

By

Gerrit Jansen van Vuuren

Fouding Engineer, Fleak

In this post, we will transform Okta logs into OCSF and make them available in Databricks. Okta logs are important, but only one part in a whole lot of other logs in any enterprise. Having security logs in a single standard format like OCSF is invaluable.

This all sounds very good, but we first need to define mappings and then run them against our logs. That’s only one part of the story, though. Once the logs are in OCSF, we have to make them available for analysis. 

We can stick logs into a database, but logs grow, and we would want to keep them for years. The best and cheapest approach is to use S3. The issue here is that S3 data lives in files and is not readily queryable. That’s where formats like Deltalake and platforms like Databricks come in.

The following sections cover each part of this process in detail and make use of AWS, Fleak’s OCSF Mapping and ZephFlow workflow tools, and Databrics. 

Let’s start with the biggest blocker most organizations face—Mapping logs.

Mapping Okta to OCSF

Mapping between log formats can take days or weeks, and even then, after spending endless hours glaring at a screen, you’re still not sure that you got everything. Having the aid of automated tools is invaluable, and instead of taking days, mapping files and formats can take minutes. Combine that with a tool that can run your mappings and you’re in mapping heaven.

We’ll use Fleak’s OCSF Mapper and its built-in templates to map Okta logs to the correct OCSF categories. Okta’s system logs map to about four main OCSF classes: account_change, api_activity, authentication, and api_findings.

We’ll walk through creating one mapping config using a sample log, then repeat the same steps for the rest. Once we’ve got our mappings for each class, we’ll save them to disk—these will be used later in the workflow that actually processes incoming logs.

Let’s build the first one.

Open up https://ocsf.ai site and search for “Okta”. 

You should see a template for Okta authentication system logs like below:

Click on the template, and it’ll open the template details display. You can see the mapping syntax details. 

Let’s click on the “Use Template” button, which takes us to the Fleak OCSF Mapping app. You’ll have to sign up or log in, which is simple via SSO. 

Once you’re signed in, you should see a page like the one below:

Select new project, to create a project, and then click on the “Create Mapping” button.

This opens the OCSF Mapping workspace. Here you can tweak and modify the mapping logic by passing in sample input data and seeing how the mapping logic transforms the message. Use the Input/Output Logs tab for that.

Let’s see how that works. Click on View to Code toggle. You’ll see the mapping code, a sample input log and the generated output data.

The OCSF Mapper, has lots of curated, readymade templates like these. You can also create your own from scratch, and the app utilizes LLMs to generate the vast majority of the mapping logic. You only need to evaluate and tweak the mapping logic to handle edge cases or items that were missed by the LLM. 

To save the transformation logic, click the download button, then select the download tab. Then click on the download DAG yaml option. Save this YAML file—we’ll use it in the following sections.

Repeat the same step for the other three Okta types: account change, api activity, and findings. We’ll use the mapping logic for each later on in a single workflow. 

Getting Okta logs into AWS

Okta makes the System logs available to us either as an API, a Report, or through one of its integrations.

Describing how to setup Okta and configure AWS integration is a blog post in itself. If you just want to follow along and don’t have an okta account you can use the sample data at the end of this post. Or open up a developer account and download a System Log csv report. 

There are several ways to configure Okta to send logs to AWS. One way is via the AWS event bridge. Then, from event bridge, you can send logs to Kinesis and use the Zephflow Kinesis source command.

- id: source
  commandName: kinesissource
  config: '{"streamName": "<your-kinesis-stream-name>", "regionStr": "<your-aws-region>", "encodingType": "JSON_OBJECT", "compressionTypes": ["GZIP"], "applicationName": "<your-app-name>", "initialPosition": "TRIM_HORIZON"}'
  outputs: []

Otherwise, you can use the template data and keep using the file source command as shown next. This option is the simplest when testing your mapping logic.

 - id: source
    commandName: filesource
    config: '{"filePath": "/app/event.json", "encodingType": "JSON_OBJECT"}'
    outputs: []

Once we have a way to get the system logs, we’re ready to convert them to OCSF.

Converting Okta logs to OCSF And Into S3 Delta Lake

We already have our mappings for the different OCSF types. Now we need a way to read input data, identify the log type, send it to the right mapping logic, and write it out to S3 in the Delta Lake format. 

The following diagram shows how our data will flow:

Let’s start with the basic workflow.

Create a directory where you’ll place your workflow and test input. Then create a YAML file—okta-ocsf-dag.yml— like the one below:

jobContext:
  otherProperties:
  metricTags:
  dlqConfig:
dag:
  - id: source
    commandName: filesource
    config: '{"filePath": "/app/event.json", "encodingType": "JSON_OBJECT"}'
    outputs: []

This creates a Zephflow workflow that reads its events from a file called `/app/event.json`. We’ll use Docker to run this workflow, and mount one of our test events to this file.

Alternatively, if you’re using Kinesis use the following source node:

 - id: sink
    commandName: kinesissink
    config: '{"encodingType": "JSON_OBJECT", "streamName": "<STREAM NAME>", "regionStr": "<REGION>"}'
    outputs: []

Fill in the region and stream name parts to match what you have deployed on AWS.

Next, let’s see how we can select the right OCSF type from the system logs.

Splitting System Logs Into OCSF Types

We’ll use Zephflow’s Filter node to split logs into different OCSF types, based on their event type. To do this, we’ll tag each log with a ocsf_class, using an eval node and a case statement.

Once tagged, the log is routed to one of several filter nodes and each filter passes the message to its matching mapping node, and finally to the output.

Let’s look at how to tag—classify—the Okta logs:

Log Classification

We can classify the Okta logs by evaluating the eventType field. For  example, authentication logs include event types like user.session, user.authentication.

Open your workflow and add the following node:

 - id: classify
    commandName: eval
    config: |-
        dict_merge(
          $,
          dict(
            ocsf_class=case(
              str_contains($.eventType, "user.session.") or
              str_contains($.eventType, "user.authentication.") or
              str_contains($.eventType, "app.authentication.") => "authentication",
              str_contains($.eventType, "user.lifecycle.") or
              str_contains($.eventType, "user.account.") => "accountchange",
              str_contains($.eventType, "security.") or
              str_contains($.eventType, "risk.") or
              str_contains($.eventType, "access.") or
              $.debugContext.debugData.threatSuspected == true => "finding",
              
              _ => "activity"
            )
          )
        )
    outputs:
      - select_auth
      - select_account
      - select_finding
      - select_activity

This node tags each log by adding the ocsf_class field. The message is then passed to all of the filter nodes. Only one will match the tag and then pass the message to the mapping logic.

Filtering For Log Types:

With classification in place, we can add filters to route each log to its corresponding mapping node. Each filter matches a single ocsf_class value.

Open your workflow file and add the following filter nodes:

- id: select_auth
    commandName: filter
    config: |
      $.ocsf_class == "authentication"
    outputs:
      - authentication
  - id: select_account
    commandName: filter
    config: |
      $.ocsf_class == "accountchange"
    outputs:
      - accountchange
  - id: select_finding
    commandName: filter
    config: |
      $.ocsf_class == "finding"
    outputs:
      - finding
  - id: select_activity
    commandName: filter
    config: |
      $.ocsf_class == "activity"
    outputs:
      - activity

Simple enough. This is our filtering logic done. Let’s update the workflow input and then add the mapping nodes.

Updating The Workflow Input

Before we can continue, we need to update the source node to point to the classify node. This will read from a file—or kinesis—and then send the logs for tagging. Open your workflow YAML and update the source node’s outputs. It should look like the YAML below:

dag:
  - id: source
    commandName: filesource
    config: '{"filePath": "/app/event.json", "encodingType": "JSON_OBJECT"}'
    outputs:
      - classify

That’s it. Our DAG can now read input data and filter based on event type. Let’s add the actual mappings next.

Add OCSF Parsing and Mapping logic.

Now that we’re routing logs to the right filters, it’s time to map them to OCSF.

We’ll use eval nodes for this — each one takes a log message and transforms it into an OCSF event. You’ll need one eval node for each OCSF type.

We’ll walk through the account change mapping together. Once you see how that works, you can apply the same pattern to the others.

An eval node has an id (its name), a config string with your mapping logic, and an outputs list that defines where to send the result.

Remember that at the start of this post we used the OCSF Mapping tool to generate some mappings? Now we´re going to use them.

The first one is account change. Open your workflow file and add the following YAML snippet:

- id: accountchange
  commandName: eval
  config: |-
    dict(
      time=ts_str_to_epoch($.published, 'yyyy-MM-dd\'T\'HH:mm:ss.SSS\'Z\''),
      actor=dict(
        user=dict(
          uid=$.actor.id,
          name=$.actor.displayName,
          type_id=case(
            $.actor.type == 'User' => 1,
            $.actor.type == 'Admin' => 2,
            _ => 0
          ),
          email_addr=$.actor.alternateId
        ),
        session=dict(
          uid=$.authenticationContext.externalSessionId
        )
      ),
      status=$.outcome.result,
      message=$.displayMessage,
      metadata=dict(
        uid=$.uuid,
        product=dict(
          name='Okta System Log',
          version=to_str($.version),
          vendor_name='Okta'
        ),
        version='1.3.0',
        profiles=array('datetime', 'host'),
        event_code=$.eventType
      ),
      class_uid=3001,
      status_id=case(
        $.outcome.result == 'SUCCESS' => 1,
        $.outcome.result == 'FAILURE' => 2,
        _ => 0
      ),
      activity_id=case(
        str_contains($.eventType, 'lock') => 4,
        str_contains($.eventType, 'create') => 1,
        str_contains($.eventType, 'update') => 2,
        str_contains($.eventType, 'delete') => 3,
        _ => 99
      ),
      severity_id=1,
      category_uid=3,
      http_request=dict(
        uid=$.debugContext.debugData.requestId,
        url=dict(
          path=$.debugContext.debugData.url
        ),
        user_agent=$.client.userAgent.rawUserAgent
      ),
      src_endpoint=dict(
        ip=$.client.ipAddress,
        os=dict(
          name=$.client.userAgent.os,
          type_id=case(
            upper($.client.userAgent.os) == 'ANDROID' => 1,
            upper($.client.userAgent.os) == 'IOS' => 2,
            upper($.client.userAgent.os) == 'WINDOWS' => 3,
            upper($.client.userAgent.os) == 'LINUX' => 4,
            upper($.client.userAgent.os) == 'MACOS' => 5,
            _ => 0
          )
        ),
        type=$.client.device,
        domain=$.securityContext.domain,
        location=dict(
          isp=$.securityContext.isp,
          lat=$.client.geographicalContext.geolocation.lat,
          city=$.client.geographicalContext.city,
          long=$.client.geographicalContext.geolocation.lon,
          country=$.client.geographicalContext.country,
          postal_code=$.client.geographicalContext.postalCode
        ),
        autonomous_system=dict(
          name=$.securityContext.asOrg,
          number=$.securityContext.asNumber
        )
      ),
      activity_name=$.eventType
    )
  outputs:
  - sink

This is where our mapping logic sits. It seems like a lot  because it is, and thankfully we can use modern tools to generate most of it for us. 

Now that the logs are mapped, we need to write them somewhere. We’ll do that with our sink node next.

Write to S3 using Delta Lake

If you're just testing the transformation, feel free to skip this part. But if you want to write the results to S3, here’s how to set up the sink node.

First, ensure you have an S3 bucket and credentials that grant you write access. We’ll use the deltalake sink command to write our logs out.

Since this sink uses Spark under the hood, you’ll need to define the output schema in a compatible way. We already know the structure from our mappings, so we just need to express it in a way Spark understands.

You can find the full schema example in the appendix.

Add the following node to your workflow. Replace the tablePath with your S3 bucket and path, and paste in the Spark-compatible schema for jsonSchema:

- id: sink
  commandName: deltalake
  config: |
    {
      "tablePath": "s3://your-bucket/path/",
      "jsonSchema": "<INSERT_STRINGIFIED_JSON_SCHEMA>",
      "sparkConfig": {}
    }
  outputs: []

This node will take all OCSF-mapped messages and write them to S3. The only thing left is to run the workflow.

Run the workflow 

You can run the workflow using either Python (with the Zephflow SDK) or Docker. Here's how to run the okta-ocsf-dag.yml workflow using Docker and a test input file:

docker run \
  -v ./okta-ocsf-dag.yml:/dag.yml \
  -v ./input.txt:/app/event.json \
  fleak/zephflow-clistarter:latest -f /dag.yml

This mounts your local okta-ocsf-dag.yml file into the container as /dag.yml, and your test input (input.txt) as /app/event.json.

Make sure your AWS credentials are available to the container — either through environment variables or your local profile.

The source node reads the input file, runs it through the workflow, and writes the result to either stdout or S3, depending on how you’ve set up the sink.

That’s it. If everything is wired up correctly and you don’t encounter any errors, you should see output appear in S3 shortly.

Reading data with Databricks

Once your data is in S3, you can hook it up to Databricks using Unity Catalog. There’s a bit of AWS setup involved — enough to warrant its own blog post — but the core idea is this: create a role and set up a trust relationship between Unity Catalog and your S3 bucket.

Once that’s done, just register the S3 path as a Delta table:

CREATE TABLE catalog.schema.table_name
USING DELTA
LOCATION 's3://your-bucket/path/';

And then query it like any other table:

SELECT * FROM catalog.schema.table_name;

No copy, no import — just straight querying over your raw Delta Lake data.

That wraps it up. In this post, we walked through converting Okta logs to OCSF and writing them to S3 so they’re ready for querying from Databricks.

Mapping projects can suck the life out of a team, and a lot end in utter failure — having the right tools to do the heavy lifting makes all the difference. Fleak provides such tools, from an OCSF mapper to a workflow engine to help you through the whole process.

If you need help with your data mappings or want advice on using our tools, we’re happy to chat.

Appendix

OCSF Spark Schema

{
  "type": "struct",
  "fields": [
    { "name": "activity_name", "type": "string", "nullable": true },
    { "name": "metadata", "type": "struct", "nullable": false, "fields": [
      { "name": "uid", "type": "string", "nullable": false },
      { "name": "product", "type": "struct", "nullable": false, "fields": [
        { "name": "name", "type": "string", "nullable": false },
        { "name": "vendor_name", "type": "string", "nullable": false },
        { "name": "version", "type": "string", "nullable": false }
      ]},
      { "name": "event_code", "type": "string", "nullable": false },
      { "name": "profiles", "type": "array", "nullable": false, "elementType": "string", "containsNull": false },
      { "name": "version", "type": "string", "nullable": false }
    ]},
    { "name": "category_uid", "type": "integer", "nullable": false },
    { "name": "logon_type_id", "type": "integer", "nullable": true },
    { "name": "src_endpoint", "type": "struct", "nullable": false, "fields": [
      { "name": "os", "type": "struct", "nullable": false, "fields": [
        { "name": "type_id", "type": "integer", "nullable": false },
        { "name": "name", "type": "string", "nullable": false }
      ]},
      { "name": "ip", "type": "string", "nullable": false },
      { "name": "domain", "type": "string", "nullable": true },
      { "name": "location", "type": "struct", "nullable": false, "fields": [
        { "name": "country", "type": "string", "nullable": false },
        { "name": "city", "type": "string", "nullable": false },
        { "name": "isp", "type": "string", "nullable": false },
        { "name": "postal_code", "type": "string", "nullable": false },
        { "name": "lat", "type": "double", "nullable": false },
        { "name": "long", "type": "double", "nullable": false }
      ]},
      { "name": "autonomous_system", "type": "struct", "nullable": false, "fields": [
        { "name": "number", "type": "integer", "nullable": false },
        { "name": "name", "type": "string", "nullable": false }
      ]},
      { "name": "type", "type": "string", "nullable": false }
    ]},
    { "name": "message", "type": "string", "nullable": false },
    { "name": "logon_type", "type": "string", "nullable": true },
    { "name": "actor", "type": "struct", "nullable": false, "fields": [
      { "name": "session", "type": "struct", "nullable": false, "fields": [
        { "name": "uid", "type": "string", "nullable": false },
        { "name": "issuer", "type": "string", "nullable": true }
      ]},
      { "name": "user", "type": "struct", "nullable": false, "fields": [
        { "name": "uid", "type": "string", "nullable": false },
        { "name": "email_addr", "type": "string", "nullable": false },
        { "name": "type_id", "type": "integer", "nullable": false },
        { "name": "name", "type": "string", "nullable": false },
        { "name": "type", "type": "string", "nullable": false }
      ]}
    ]},
    { "name": "status_id", "type": "integer", "nullable": false },
    { "name": "class_uid", "type": "integer", "nullable": false },
    { "name": "activity_id", "type": "integer", "nullable": false },
    { "name": "http_request", "type": "struct", "nullable": false, "fields": [
      { "name": "uid", "type": "string", "nullable": false },
      { "name": "user_agent", "type": "string", "nullable": false },
      { "name": "url", "type": "struct", "nullable": true, "fields": [
        { "name": "path", "type": "string", "nullable": true }
      ]}
    ]},
    { "name": "time", "type": "long", "nullable": false },
    { "name": "severity_id", "type": "integer", "nullable": false },
    { "name": "dst_endpoint", "type": "struct", "nullable": false, "fields": [
      { "name": "svc_name", "type": "string", "nullable": false }
    ]},
    { "name": "type_uid", "type": "integer", "nullable": false },
    { "name": "status", "type": "string", "nullable": false }
  ]
}

Workflow Dag

jobContext:
  otherProperties:
  metricTags:
  dlqConfig:
dag:
  - id: source
    commandName: filesource
    config: '{"filePath": "/app/event.json", "encodingType": "JSON_OBJECT"}'
    outputs:
      - classify
  - id: classify
    commandName: eval
    config: |-
        dict_merge(
          $,
          dict(
            ocsf_class=case(
              str_contains($.eventType, "user.session.") or
              str_contains($.eventType, "user.authentication.") or
              str_contains($.eventType, "app.authentication.") => "authentication",
              str_contains($.eventType, "user.lifecycle.") or
              str_contains($.eventType, "user.account.") => "accountchange",
              str_contains($.eventType, "security.") or
              str_contains($.eventType, "risk.") or
              str_contains($.eventType, "access.") or
              $.debugContext.debugData.threatSuspected == true => "finding",
              
              _ => "activity"
            )
          )
        )
    outputs:
      - select_auth
      - select_account
      - select_finding
      - select_activity
  - id: select_auth
    commandName: filter
    config: |
      $.ocsf_class == "authentication"
    outputs:
      - authentication
  - id: select_account
    commandName: filter
    config: |
      $.ocsf_class == "accountchange"
    outputs:
      - accountchange
  - id: select_finding
    commandName: filter
    config: |
      $.ocsf_class == "finding"
    outputs:
      - finding
  - id: select_activity
    commandName: filter
    config: |
      $.ocsf_class == "activity"
    outputs:
      - activity
  - id: finding
    commandName: eval
    config: |-
      dict(
        time=ts_str_to_epoch($.published, 'yyyy-MM-dd\'T\'HH:mm:ss.SSS\'Z\''),
        message=$.displayMessage,
        metadata=dict(
          uid=$.uuid,
          product=dict(
            name='Okta System Log',
            version=to_str($.version),
            vendor_name='Okta'
          ),
          version='1.3.0',
          profiles=array('datetime', 'host', 'security_control'),
          event_code=$.eventType
        ),
        action_id=case(
          $.outcome.result == 'DENY' => 2,
          $.outcome.result == 'ALLOW' => 1,
          _ => 0
        ),
        class_uid=2004,
        evidences=dict(
          src_endpoint=dict(
            ip=$.client.ipAddress,
            os=dict(
              name=$.client.userAgent.os,
              type_id=case(
                $.client.userAgent.os == 'Windows' => 1,
                $.client.userAgent.os == 'Mac OS' => 2,
                $.client.userAgent.os == 'Linux' => 3,
                _ => 0
              )
            ),
            type=$.client.device,
            domain=$.securityContext.domain,
            location=dict(
              isp=$.securityContext.isp,
              lat=$.client.geographicalContext.geolocation.lat,
              city=$.client.geographicalContext.city,
              long=$.client.geographicalContext.geolocation.lon,
              country=$.client.geographicalContext.country,
              postal_code=$.client.geographicalContext.postalCode
            ),
            autonomous_system=dict(
              name=$.securityContext.asOrg,
              number=$.securityContext.asNumber
            )
          )
        ),
        resources=array(
          dict(
            name=$.debugContext.debugData.url
          )
        ),
        severity_id=1,
        category_uid=2,
        finding_info=dict(
          uid=$.uuid,
          title=$.outcome.reason
        ),
        disposition_id=case(
          $.outcome.result == 'DENY' => 2,
          $.outcome.result == 'ALLOW' => 1,
          _ => 0
        )
      )
    outputs:
      - sink
  - id: authentication
    commandName: eval
    config: |-
      dict(
        time=ts_str_to_epoch($.published, 'yyyy-MM-dd\'T\'HH:mm:ss.SSS\'Z\''),
        actor=dict(
          user=dict(
            uid=$.actor.id,
            name=$.actor.displayName,
            type=case(
              $.actor.type == 'User' => 'User',
              $.actor.type == 'Admin' => 'Admin',
              _ => 'Unknown'
            ),
            type_id=case(
              $.actor.type == 'User' => 1,
              $.actor.type == 'Admin' => 2,
              _ => 0
            ),
            email_addr=$.actor.alternateId
          ),
          session=dict(
            uid=$.authenticationContext.externalSessionId,
            issuer=$.authenticationContext.issuer.id
          )
        ),
        status=case(
          $.outcome.result == 'SUCCESS' => 'Success',
          $.outcome.result == 'FAILURE' => 'Failure',
          _ => 'Other'
        ),
        message=$.displayMessage,
        metadata=dict(
          uid=$.uuid,
          product=dict(
            name='Okta System Log',
            version=to_str($.version),
            vendor_name='Okta'
          ),
          version='1.3.0',
          profiles=array(
            'datetime',
            'host'
          ),
          event_code=$.eventType
        ),
        type_uid=300201,
        class_uid=3002,
        status_id=case(
          $.outcome.result == 'SUCCESS' => 1,
          $.outcome.result == 'FAILURE' => 2,
          _ => 99
        ),
        logon_type=case(
          $.transaction.type == 'WEB' => 'Interactive',
          $.transaction.type == 'API' => 'Remote',
          _ => 'Unknown'
        ),
        activity_id=case(
          $.eventType == 'user.session.start' => 1,
          $.eventType == 'user.session.end' => 2,
          _ => 99
        ),
        severity_id=1,
        category_uid=3,
        dst_endpoint=dict(
          svc_name=$.debugContext.debugData.url
        ),
        http_request=dict(
          uid=$.debugContext.debugData.requestId,
          user_agent=$.client.userAgent.rawUserAgent
        ),
        src_endpoint=dict(
          ip=$.client.ipAddress,
          os=dict(
            name=$.client.userAgent.os,
            type_id=case(
              upper($.client.userAgent.os) == 'MAC OS X' => 1,
              upper($.client.userAgent.os) == 'WINDOWS' => 2,
              upper($.client.userAgent.os) == 'LINUX' => 3,
              _ => 0
            )
          ),
          type=$.client.device,
          domain=$.securityContext.domain,
          location=dict(
            isp=$.securityContext.isp,
            lat=$.client.geographicalContext.geolocation.lat,
            city=$.client.geographicalContext.city,
            long=$.client.geographicalContext.geolocation.lon,
            country=$.client.geographicalContext.country,
            postal_code=$.client.geographicalContext.postalCode
          ),
          autonomous_system=dict(
            name=$.securityContext.asOrg,
            number=$.securityContext.asNumber
          )
        ),
        activity_name=case(
          $.eventType == 'user.session.start' => 'Logon',
          $.eventType == 'user.session.end' => 'Logoff',
          _ => 'Other'
        ),
        logon_type_id=case(
          $.transaction.type == 'WEB' => 1,
          $.transaction.type == 'API' => 2,
          _ => 0
        )
      )
    outputs:
      - sink
  - id: activity
    commandName: eval
    config: |-
      dict(
        api=dict(
          request=dict(
            uid=$.actor.id,
            data=dict(
              appUserName=$.debugContext.debugData.appUserName
            )
          ),
          service=dict(
            uid=$.target[0].id,
            name=$.target[0].displayName,
            labels=array(
              $.target[0].type
            )
          ),
          response=dict(
            code=case(
              $.outcome.result == "SUCCESS" => 200,
              _ => 400
            )
          ),
          operation="assignment.add"
        ),
        time=ts_str_to_epoch($.published, "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"),
        actor=dict(
          user=dict(
            uid=$.actor.id,
            name=$.actor.displayName,
            uid_alt=$.actor.alternateId
          ),
          app_name="Okta"
        ),
        status=case(
          $.outcome.result == "SUCCESS" => "Success",
          _ => "Failure"
        ),
        message=$.displayMessage,
        metadata=dict(
          product=dict(
            name="Okta",
            feature=dict(
              name="Identity Management"
            )
          ),
          version="1.5.0",
          event_code=$.eventType,
          logged_time=ts_str_to_epoch($.published, "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")
        ),
        type_uid=600301,
        class_uid=6003,
        resources=array(
          dict(
            uid=$.target[0].id,
            name=$.target[0].displayName,
            type=$.target[0].type,
            uid_alt=$.target[0].alternateId
          )
        ),
        status_id=case(
          $.outcome.result == "SUCCESS" => 1,
          _ => 2
        ),
        activity_id=1,
        severity_id=1,
        status_code=case(
          $.outcome.result == "SUCCESS" => "200",
          _ => "400"
        ),
        category_uid=6,
        dst_endpoint=dict(
          uid=$.target[0].id,
          name=$.target[0].displayName,
          type="AppInstance",
          type_id=1,
          hostname=$.target[0].alternateId
        ),
        http_request=dict(
          user_agent=$.client.userAgent.rawUserAgent
        ),
        src_endpoint=dict(
          ip=$.client.ipAddress,
          type="Browser",
          type_id=8
        ),
        http_response=dict(
          code=case(
            $.outcome.result == "SUCCESS" => 200,
            _ => 400
          ),
          status=case(
            $.outcome.result == "SUCCESS" => "OK",
            _ => "Bad Request"
          ),
          message=$.displayMessage
        ),
        status_detail=$.displayMessage,
        timezone_offset=0
      )
    outputs:
      - sink
  - id: accountchange
    commandName: eval
    config: |-
      dict(
        time=ts_str_to_epoch($.published, 'yyyy-MM-dd\'T\'HH:mm:ss.SSS\'Z\''),
        actor=dict(
          user=dict(
            uid=$.actor.id,
            name=$.actor.displayName,
            type_id=case(
              $.actor.type == 'User' => 1,
              $.actor.type == 'Admin' => 2,
              _ => 0
            ),
            email_addr=$.actor.alternateId
          ),
          session=dict(
            uid=$.authenticationContext.externalSessionId
          )
        ),
        status=$.outcome.result,
        message=$.displayMessage,
        metadata=dict(
          uid=$.uuid,
          product=dict(
            name='Okta System Log',
            version=to_str($.version),
            vendor_name='Okta'
          ),
          version='1.3.0',
          profiles=array('datetime', 'host'),
          event_code=$.eventType
        ),
        class_uid=3001,
        status_id=case(
          $.outcome.result == 'SUCCESS' => 1,
          $.outcome.result == 'FAILURE' => 2,
          _ => 0
        ),
        activity_id=case(
          str_contains($.eventType, 'lock') => 4,
          str_contains($.eventType, 'create') => 1,
          str_contains($.eventType, 'update') => 2,
          str_contains($.eventType, 'delete') => 3,
          _ => 99
        ),
        severity_id=1,
        category_uid=3,
        http_request=dict(
          uid=$.debugContext.debugData.requestId,
          url=dict(
            path=$.debugContext.debugData.url
          ),
          user_agent=$.client.userAgent.rawUserAgent
        ),
        src_endpoint=dict(
          ip=$.client.ipAddress,
          os=dict(
            name=$.client.userAgent.os,
            type_id=case(
              upper($.client.userAgent.os) == 'ANDROID' => 1,
              upper($.client.userAgent.os) == 'IOS' => 2,
              upper($.client.userAgent.os) == 'WINDOWS' => 3,
              upper($.client.userAgent.os) == 'LINUX' => 4,
              upper($.client.userAgent.os) == 'MACOS' => 5,
              _ => 0
            )
          ),
          type=$.client.device,
          domain=$.securityContext.domain,
          location=dict(
            isp=$.securityContext.isp,
            lat=$.client.geographicalContext.geolocation.lat,
            city=$.client.geographicalContext.city,
            long=$.client.geographicalContext.geolocation.lon,
            country=$.client.geographicalContext.country,
            postal_code=$.client.geographicalContext.postalCode
          ),
          autonomous_system=dict(
            name=$.securityContext.asOrg,
            number=$.securityContext.asNumber
          )
        ),
        activity_name=$.eventType
      )
    outputs:
      - sink
  - id: sink
    commandName: stdout
    config: '{"encodingType": "JSON_ARRAY"}'
    outputs: [ ]

Start Building with Fleak Today

Production Ready Transformation in Minutes

Request a Demo

Start Building with Fleak Today

Production Ready Transformation in Minutes

Request a Demo