OPAL 102 – Shaping Structured and Unstructured Data Using Stages

Follow this tutorial to understand using OPAL to shape your data using Stages.

Scenario

In this scenario, you have an IoT system sending weather data in two formats. You want to manipulate the data and create a coherent picture of the weather.

In this tutorial, you learn how to manipulate your data using OPAL, the language used by Observe to perform database tasks.

Ingesting Data into Observe

Before working with OPAL, you must ingest your data into Observe using a Datastream and an Ingest Token. Use the following steps to start your ingestion process:

  1. Log into your Observe instance, and click Settings Settings icon in the left menu.

  2. From the menu, select Workplace Settings.

  3. From the left menu, select Datastreams.

Creating a Datastream for ingesting data

Figure 1 - Creating a Datastream for Ingesting Data

4. Click Create Datastream.

5. Enter OPAL102Token as the Name.

6. For the Description, enter a short description of the Datastream. (Optional)

Add Datastream Name and Description

Figure 2 - Add Datastream Name and Description

7. Click Create.

Datastream Created

Figure 3 - Datastream Created

Adding an Ingest Token to Your Datastream

To allow data into your Datastream, you must create a unique Ingest Token. Use the following steps to create an Ingest Token:

  1. To add a new Ingest Token, click Create token.

  2. Enter OPAL102Token as the Name.

  3. For the Description, enter a short description of the Ingest Token. (Optional)

Add Ingest Token Name and Description

Figure 4 - Add Ingest Token Name and Description

4. Click Continue.

5. You receive a confirmation message confirming the creation of your token creation and the token value.

6. Click Copy to clipboard to copy the token value and then save it in a secure location.

Note

You can only view the token once in the confirmation message. If you don’t copy and paste the token into a secure location, and you lose the token, you have to create a new one.

7. Once you copy it into a secure location, select the confirmation checkbox, and then click Continue.

Successfully creating an ingest token

Figure 5 - Successfully creating an Ingest Token

8. After you click Continue, Observe displays Setup instructions for using the new Ingest Token.

Setup instructions for using the Ingest Token

Figure 6 - Setup instructions for using the Ingest Token

The URL, https://{customerID}.collect.observeinc.com/, displays your customer ID and URL for ingesting data.

Note

Some Observe instances may optionally use a name instead of Customer ID; if this is the case for your instance, contact your Observe Data Engineer to discuss implementation. A stem name will work as is, but a DNS redirect name may require client configuration.

HTTP requests must include your customer ID and Ingest Token information.

You can view the Setup instructions on your Datastream’s About tab.

Using cURL to Ingest Data into the Tutorial Datastream

This tutorial uses two log files containing weather data that you must send to your Observe instance:

Or

Download the files here.

Copy the files to the following directory:

  • MacOS - home/<yourusername>

  • Windows - C:\Users\<yourusername

Send this data to the v1/http endpoint using cURL. See EndPoints in the Observe User Guide for more details.

Details about the URL

  • The path component of the URL encoded as a tag.

  • Query parameters also encoded as tags.

Before you ingest the logs, check the health of the Observe API instance using the following cURL:

curl https://api.observe-eng.com/v1/health

You should see the following response from the instance:

{"ok":true,"message":"Everything's perfectly all right now. We're fine. We're all fine here, now, thank you. How are you?"}%

To accomplish this, add the path, OPAL102, and the query parameter, format. Keep this information in mind for use later in the tutorial.

Copy and paste the following cURL to ingest the log data into the Datastream. Replace ${CustomerID} with your Customer ID, such as 123456789012. Replace{token_value} with the token you created in Adding an Ingest Token to Your Datastream.

curl "https://${CustomerID}.collect.observeinc.com/v1/http/OPAL102?format=log" \
-H "Authorization: Bearer {token_value}" \
-H "Content-type: text/plain" \
--data-binary @./Measurements.MessyLog.log
curl "https://${CustomerID}.collect.observeinc.com/v1/http/OPAL102?format=jsonarray" \
-H "Authorization: Bearer {token_value}" \
-H "Content-type: application/json" \
--data-binary @./Measurements.ArrayOf.json

Each cURL command returns {"ok":true} when successful.

After ingesting the data, you should see the data in the Datastream, OPAL102Token.

Successfully ingesting data into the datastream

Figure 7 - Successfully ingesting data into the datastream

  1. Click Open dataset to view the ingested data.

  2. Select a row to display more information about that log event.

Displaying details about a row of ingested data

Figure 8 - Displaying details about a row of ingested data

OPAL and Processing in Stages

What is OPAL?

An OPAL pipeline consists of a sequence of statements where the output of one statement creates the input for the next. This could be a single line with one statement or many lines of complex shaping and filtering.

A pipeline contains four types of elements:

  • Inputs - define the applicable datasets.

  • Verbs - define what processing to perform with those datasets.

  • Functions - define transforming individual values in the data.

  • Outputs - pass a dataset on to the next verb or the final result.

In this tutorial, you continue to build on concepts and tasks described in OPAL 101 Tutorial and become familiar with the concepts of structured and semi-structured data, manipulating arrays, conditional data shaping, stages, and subqueries.

Introducing Stages and the Configuration Plan

In this exercise, you have an IoT system sending weather data. The system sends data in two formats:

  • Measurements.ArrayOf.json file sent in an application/json format, where each semi-structured chunk of data automatically treated by Observe as a complete observation.

  • Measurements.MessyLog.log file sent as text/plain format, with some unstructured data and some semi-structured data on multiple lines. Observe treats each line of the file as a separate observation. Log tracing systems send this data format because it has no knowledge about the type of message.

When you ingest the files, Observe stores them in the Observe DataStream. You can manipulate different types of messages and normalize them into a coherent whole using a concept called a Stage. A stage consists of an individual results table in a Worksheet. Stages can be linked and chained together in sequences and branching.

Building Stages Hierarchy and Graph

In this scenario, you perform the following tasks:

  • Create Stage A - All Records by Type to decide the incoming format from the Datastream.

  • Create Stage B - Structured Records derived from Stage A to convert unstructured datasets from Measurements.MessyLog.log into structured data.

  • Create a Stage C - Unstructured Records derived from Stage A to turn structured datasets from Measurements.ArrayOf.json and perform light processing to match the structure from Stage B.

  • Join data from Stage B and Stage C together in Stage D - Structured And Unstructured Records and perform the additional processing and formatting on the joined data.

Workflow for data shaping

Figure 9 - Workflow for Data Shaping

Creating Stage A - All Records By Type

  1. To create a Worksheet and start modeling your data, click Create Worksheet at the top of your OPAL102Token Dataset.

Create a worksheet from the Dataset

Figure 10 - Create a Worksheet from the OPAL102Token Dataset

2. Rename the name of the only stage in the worksheet from Stage 1 to Stage A - All Records By Type.

Changing the name of the Stage

Figure 11 - Change the Stage name to Stage A - All Records by Type

3. Add the following OPAL code in the OPAL console:

// Determine the type of observations sent to Observe
make_col format:string(EXTRA.format)
Adding OPAL code in the OPAL console

Figure 12 - Adding OPAL code in the OPAL console

Classify Records using the filter function

Create a format column containing either jsonarray or log values based on parameters of the query string of cURL requests that sent the data to Observe.

This type of basic extraction from raw observations and filtering should be familiar from the OPAL 101 Tutorial example.

Creating Stage B - Structured Records

  1. Add a second stage that builds on the previous Stage A by clicking Link New Stage at the bottom of the Worksheet.

  2. Rename the stage to Stage B - Structured Records.

Adding Stage B

Figure 13 - Adding Stage B - Structured Records

Add the following OPAL code to your OPAL console:

// Step 1: Filter to only the events structured in JSON format

filter format = "jsonarray"

// Step 2: For records that contain JSON, the entire structured payload is in the FIELDS as an VARIANT/any

rename_col temperatureMeasurement:FIELDS

// Step 3: Filter to only those measurements with valid timestamps in the JSON structure

filter path_exists(temperatureMeasurement, "current.dt")

// Step 4: In this type of message, duration of measurement is stored in the nice property of JSON object

make_col measurementDuration:duration_sec(int64(temperatureMeasurement.measurementDur))

Step 1 - Filtering Data using the filter Function

// Step 1: Filter to only the events structured in JSON format

filter format = "jsonarray"

In the OPAL code, Step 1 uses the filter function to filter the data to only records containing the jsonarray value in the format column.

Step 2 - Renaming an Existing Column using the rename_col Function

// Step 2: For records that contain JSON, the entire structured payload is in the FIELDS as a VARIANT/any

rename_col temperatureMeasurement:FIELDS

Step 2 renames the existing column using the rename_col function. These records ingested into Observe with the application/json format, and automatically stored in the FIELDS column in any data type. This datatype is ready to be used.

Use the rename_col projection operation to simply rename the FIELDS column to temperatureMeasurement. Not creating a new column saves processing resources in the data conversion pipeline.

Step 3 - Checking for Key Existence using the path_exists Function

// Step 3: Filter to only those measurements with valid timestamps in the JSON structure
filter path_exists(temperatureMeasurement, "current.dt")

In this example, one of the measurements does not contain the current.dt key, and for purposes of this tutorial, considered invalid. This OPAL code in Step 3 uses the path_exists function to test for the existence of this key.

Valid MeasurementInvalid Measurement
{ "measurementDur": 128,
"lat": 59.31,
"lon": -64.5,
"current": {
"dt": 1646348698,
"temp": -2.0
}
, "past30Days": [10, …]
}
{ "measurementDur": 256,
"lat": 59.31,
"lon": -64.5,
"current": {
"temp": 27.01 },
"past30Days": [16, …]
}

Step 4 - Converting a Numeric Value to a Duration using duration_sec Function

//Step 4: In this type of message, duration of measurement is stored in the nice property of JSON object
make_col measurementDuration:duration_sec(int64(temperatureMeasurement.measurementDur))

Step 4 converts numeric values,measurementDur, to durations using the duration_sec function.

Converting numeric value to duration value

Figure 14 - Converting Numeric Values to Duration Values

Creating Stage C - Unstructured Records

  1. Return to Stage A and click Link New Stage to add a third stage.

  2. Rename the new stage to Stage C - Unstructured Records.

Add the following block of OPAL code and click Run:

// Step 1: Filter to only the events from the log file that comes in in form of single event per line

filter format = "log"

// Step 2: For records that contain text and JSON as a string, the 
payload is in the FIELDS.text string

make_col logRaw:string(FIELDS.text)

// Step 3: Each line of the log file is a separate row. Merge multiline input into a single logical event

merge_events 
  match_regex(logRaw, /\[\d{2}\/\d{2}\/\d{4}.*\]\[Thermostat.*\]/), 
  options(max_size: 1000, max_interval: 1m),  
  logMessageMultiline:string_agg(logRaw, ""), 
  order_by(BUNDLE_ID, OBSERVATION_INDEX), 
  group_by(format)

// Step 4: Extract the log message and the temperature measurement

extract_regex logMessageMultiline, /(?P<logMessageFromText>\[.*]Observed temperature:)\s(?P<temperatureMeasurement>{.*})/, "ims"

// Step 5: Parse temperature measurement json. Because  we are joining 
this to another column of "object" datatype, you coerce it from "any"/variant datatype returned from parse_json to the "object" datatype

make_col temperatureMeasurement:object(parse_json(temperatureMeasurement))

// Step 6: Filter to only those measurements that had valid timestamps in the JSON structure

filter path_exists(temperatureMeasurement, "current.dt")

// Step 7: In this log, duration of measurement is stored in the messy CSV label if the message
// Extract the comma-separated value of interesting context data from freeform text

extract_regex logMessageFromText, /\[context:(?P<measurementContextRaw>.*)\]\[/, "i"

// Parse the comma-separated context value

make_col measurementContextCsv:parse_csv(measurementContextRaw, false)

// The third item in the comma-separated string is an length of duration. You can easily index into it for display

make_col measurementDuration:duration_sec(int64(measurementContextCsv[2]))
Adding OPAL code in the OPAL console

Figure 15 - Stage C after Running OPAL Code

Step 1 - Filter Data using the filter Function

This step filters data from Stage A to contain only records with the value, log, in the format column. Focusing only on the desired records saves processing resources.

Observe ingests these records in the text/plain format and automatically stores them in the FIELDS column as an object of any data type with each incoming line in a text key, for example,

{"text":"[05/05/2023 13:59:18.847][Thermostat/112.0.0.0][context:2018/2018,0,64,330130][I:SH2WB01IN1AWN]Observed temperature: {\r\n"}
{"text":"    \"lon\": -74.5,\r\n"}
{"text":"[05/05/2023 13:59:18.847][Thermostat/112.0.0.0][context:2018/2018,0,32,330130][I:SH2WB01IN1AWN]Observed temperature: {\r\n"}
{"text":"        \"dt\": 1646338698,\r\n"}
{"text":"        \"dt\": 1646318698,\r\n"}
{"text":"    \"lat\": 39.31,\r\n"}
{"text":"    \"current\": {\r\n"}

Step 2 - Project New Object Column using make_col Function

Each row contains a single observation, but Observe can merge the related items into a single value. Use the make_col projection operation to extract the FIELDS.text key and cast the value to logRaw string column. This allows you to prepare to merge events in the next step.

Step 3 - Combining Events with the merge_events Aggregate Verb

Each line of the log file is a separate row. Merge multi-line input into a single logical event.

Use the merge_events aggregate verb to merge consecutive events into a single event.

Details about each OPAL statement:

// Step 3: Each line of the log file is a separate row. Merge multiline input into a single logical event
merge_events 
//^^^^^^^^^^                                                        Name of the function
  match_regex(logRaw, /\[\d{2}\/\d{2}\/\d{4}.*\]\[Thermostat.*\]/), 
//^^^^^^^^^^                                                        match_regex function
//           ^^^^^^^                                                column to scan for values
//                    ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^   regex to match that indicates beginning of the message
  options(max_size: 1000, max_interval: 1m),
//^^^^^^^                                                           options function
//        ^^^^^^^^                                                  maximum number of events to merge into single event
//                        ^^^^^^^^^^^^                              maximum duration between events
  logMessageMultiline:string_agg(logRaw, ""), 
//^^^^^^^^^^^^^^^^^^^                                               name of column that will contain new event
//                    ^^^^^^^^^^                                    the aggregate expression that combines multiple events into single one
//                               ^^^^^^                             column to operate on
//                                       ^^                         delimiter to combine events. In our case it's empty
  order_by(BUNDLE_ID, OBSERVATION_INDEX), 
//         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^                             how to order records
  group_by(format)
//         ^^^^^^                                                   what to group the events by

To match the first row for merge_events, use match_regex function to find the row that begins the event.

The options function is a special function to provide options to modify the behavior of various functions. It takes key/value pairs of modifiers. For the merge_events function, use max_size to indicate the maximum number of events to merge and max_interval and the maximum duration between the first and last event. For this tutorial, the selected value, 1 minute, could be too generous, but for real-world examples, it is possible to have some time skew between incoming records that should be merged. Larger match intervals can lead to using more computational credits used in processing data.

The string_agg aggregate function combines multiple values separated by a delimiter.

The order_by clause forces the right sequence of events to be combined. The default time order for order_by is time-ascending. When records are ingested using the cURL command, all of the timestamps for those records are within nanoseconds of each other, and all file contents have the same BUNDLE_ID. Each line has a unique, increasing OBSERVATION_INDEX, making it easy to reconstruct the order of lines.

If you don’t specify group_by, the default grouping for merge_events becomes a set of primary key columns. In this example, you have not defined any primary keys and for completeness, use the format column, which in this stage is always the same.

Step 4 - Using extract_regex Function to Split Unstructured and Structured Data from a String

Extract the log message and the temperature measurement using the following OPAL code.

You reconstructed multiple lines of ingested log files into a single event in logMessageMultiline string column, and in this section, use the extract_regex verb to convert unstructured strings into structured data. This produces the logMessageFromText and the temperatureMeasurement string columns with the latter one containing a JSON string to parse. See the previous [tutorial] for a more detailed breakdown explanation of extract_regex.

Step 5 - Creating an Object from a JSON String Using parse_json and object Functions

Parse the temperature measurement JSON. Because you joined this to another column of object datatype, extract it from the "any"/variant datatype returned from parse_json to the object datatype.

OPAL Details

make_col temperatureMeasurement:object(parse_json(temperatureMeasurement)) 
//       ^^^^^^^^^^^^^^^^^^^^^^                                            New column name
//                              ^^^^^^                                     casting of 'any' type variable to 'object' type variable
//                                    ^^^^^^^^^^^                          parsing of json from text
//                                                ^^^^^^^^^^^^^^^^^^^^^^   text value to parse

The parse_json function parses the string into a temperatureMeasurement column of any data type which can be an object or an array data type. In a later stage when you combine this Stage C - Unstructured Records with results of Stage B - Structured Records, the types of columns must match, and the temperatureMeasurement column in Stage B - Structured Records is already an object data type. Therefore, use the object function to coerce this column to the object data type.

Step 6 - Checking for Key Existence in Object Data with path_exists Function

This step uses the same logic as Step 3 of Creating Stage B - Structured Records.

Step 7 - Parsing CSV Strings Using the parse_csv Function

In this log, the duration of the measurement is stored in the messy CSV label and you extract the comma-separated value of interesting context data from free-form text.

OPAL Details

// Extract the comma-separated value of interesting context data from freeform text
extract_regex logMessageFromText, /\[context:(?P<measurementContextRaw>.*)\]\[/, "i"
//                                               ^^^^^^^^^^^^^^^^^^^^^^  Column to extract with regular expression
// Parse the comma-separated context value
make_col measurementContextCsv:parse_csv(measurementContextRaw)
//                             ^^^^^^^^^                                 Parsing of CSV value
//                                       ^^^^^^^^^^^^^^^^^^^^^           Column to parse CSV from
//             
// The third item in the comma-separated string is the length of duration. 
make_col measurementDuration:duration_sec(int64(measurementContextCsv[2]))
//                           ^^^^^^^^^^^^                                Convert the value to duration in seconds

The example log message, when stripped of the JSON, contains a context block with some values:

[05/05/2023 13:59:18.847][Thermostat/112.0.0.0][context:2018/2018,0,16,330130][I:SH2WB01IN1AWN]Observed temperature:
                                                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ you want this value

In this message, you extract the comma-separated string with the extract_regex function, and use the parse_csv function to convert it into an array.

The third item in the comma-separated string is the duration length. You can index it for display purposes.

The third value of the array, arrays in OPAL are 0 based, so index 2, contains the duration of the measurement taken by the sensor which you change to duration data type using the duration_sec function, just like in Step 4 of Stage B - Structured Records.

Creating Stage D - Structured and Unstructured Records

You have split the processing of data from Stage A into Stage B and Stage C. Now you bring the processing of the data back into a single stage.

Add a stage that links to Stage B using the following steps:

  1. Return to Stage B.

  2. Click Link New Stage to add a fourth stage.

  3. Rename the new stage to Stage D - Structured and Unstructured Records.

  4. Click Input next to the OPAL console and then click Add Input.

  5. Select the Stage C - Unstructured Records stage.

When you add input to the Stage, Observe creates a reference to another Stage with a certain auto-generated suffix. If you named your Stages as presented in the tutorial, the name of Stage reference should be Stage C - Unstructured Records_-tc62. If not, then adjust your OPAL input accordingly.

Add the following block of OPAL code to the OPAL console:

// Step 1: Combine Stages B and C. B is implied as the first Stage in the Inputs tab.
union @"C - Unstructured Records_-tc62"

// Step2: Extract a couple of values from JSON
make_col lat:float64(temperatureMeasurement.lat), lon:float64(temperatureMeasurement.lon)

// Step 3: Parse temperature 3 different ways
// This is the cleanest, platform native way
make_col tempValue:float64(temperatureMeasurement.current.temp)
// For fans of using explicit object navigation, newtonsoft/json.net and gson style
make_col tempValue_alt1:get_field(object(get_field(temperatureMeasurement, "current")), "temp")
// For fans of jq-style extraction, use JMESPath expressions
make_col tempValue_alt2:get_jmespath(temperatureMeasurement, "current.temp")

// Step 4: Get a temperature gauge with conditional formatting. Unicode! Emojis!
make_col "🌡️ temp":case(
    tempValue > 25,  "☀️ warm",
    tempValue > 10,  "🌦️ sorta warm",
    tempValue >= 0, "🌩️ cold",
    true, "❄️ freezing")

// Step 5: Create an explicit array from JSON object that contains it
make_col past30DaysTemp:array(temperatureMeasurement.past30Days)
// How many days ago has the temperature hit exactly 20?
make_col howLongAgo20Degrees:array_length(past30DaysTemp) - index_of_item(past30DaysTemp, 20)
// Get a slice of the array showing last 10 days only
make_col past10DaysTemp:slice_array(past30DaysTemp, -10)

// Step 6: Choose the columns, sometimes renaming them. Drop all the others
pick_col 
    BUNDLE_TIMESTAMP,
    lat,
    lon,
    tempValue,
    @."🌡️ temp", // Needs special treatment, using @ as current stage name, and quoted around emojis    howLongAgo20Degrees,
    past10DaysTemp,
    past30DaysTemp,
    measurementDuration,
    temperatureMeasurement,
    format
Adding OPAL code in the OPAL console

Figure 16 - Stage D after Running OPAL Code

Step 1 - Combining the Results of Multiple Stages with the union Verb

Use the union verb to combine results of B - Structured Records and C - Unstructured Records stages.

If you named the stages as stated in each section, the name of the stage reference should be C - Unstructured Records_-tc62 because Observe adds a small semi-random hash to the name of the variable.

Unlike direct database engines where the unioned tables are often mandated to be identical, in Observe, the union verb does not require this. The two stages don’t fully match, and any records where columns are not shared receive NULL values in the opposite dataset.

Step 2 - Extracting Numeric Values from a JSON object using the make_col Verb and float64 Function

In this step, you extract latitude and longitude from the JSON object representing the measurement of data. The object was reconstructed from multiple text lines in the Stage C - Unstructured Records and turned into an object, and parsed the same way as the object that arrived to Observe already in one piece.

Step 3 - Extracting Numeric Values from a JSON Object Natively with get_field or get_jmespath Functions

In this step, you can see three ways of addressing JSON object paths using OPAL.

In the sample data, the observation has the following format:

{
    "measurementDur": 16,
    "lat": 49.31,
    "lon": -74.5,
    "current": {
        "dt": 1646318698,
        "temp": 22.21
    },
    "past30Days": [16, 15, ..., 18]
}

If you want to obtain the value of the temp key, a child of the current object, use OPAL with . (dot) notation and [0] indexers.

If you want to use explicit object navigation, OPAL uses a get_field function to extract a key from a given object. The function produces the same result as before, but uses more OPAL to do so.

//use explicit object navigation, newtonsoft/json.net and gson style
make_col tempValue_alt1:get_field(object(get_field(temperatureMeasurement, "current")), "temp")
//                      ^^^^^^^^^                                                        ^^^^ Get the 'temp' property of child object
//                                ^^^^^^
//                                       ^^^^^^^^^                          ^^^^^^^^          Get the 'current' property of parent object
//                                                 ^^^^^^^^^^^^^^^^^^^^^^                     Parent object 

As a third option, OPAL implements a get_field function to use JMESPath, a query language for JSON. Unlike the previous two functions, get_field may be more expensive to use.

//use JMESPath expressions
make_col tempValue_alt2:get_jmespath(temperatureMeasurement, "current.temp")

Step 4 - Using the case Function for Conditional Evaluation

4. Create a temperature gauge with conditional formatting and emojis.

Step 4 introduces conditional evaluation using the case function. OPAL evaluates conditions and results in pairs in the order of argument. The first successful evaluation to true ends execution.

This step also highlights that Observe platform allows for a bit of color and fun with any Unicode character, including emojis, in both the values and in field names.

Step 5 - Supporting Arrays with the array, array_length, index_of_item, and slice_array Functions

Like all programming languages, OPAL supports arrays for creation, measurement, slicing, and indexing. Step 5 uses an array function to convert an implicit array from a JSON object to a real array, and an array_length function to measure the length of the array. The step also uses the index_of_item function to find an index of a specific item in the array, and the slice_array function to take the last 10 items from the longer array.

Step 6 - Using the pick_column Verb to Select Columns to Display

You’ve created a lot of intermediate columns so far in this tutorial. Now you can use the OPAL pick_col verb to choose the columns you want to see as the final result and optionally rename them.

Note to reference a field with non-alphanumeric characters like the fun emoji thermometer, you must use double quotes and prepend with @..

Creating Stage E - Use One Stage with Subqueries (optional)

A standard OPAL query consists of a linear series of steps with one or more dataset inputs. Using Stages as shown in this tutorial makes it possible to isolate portions of processing into logically isolated units of work and build data flows that split and merge as needed.

You can combine all the work you’ve done in earlier Stages into a single Stage with a single OPAL script. Observe enables this using the concept of Subquery. Logically, a Subquery is similar to a Stage using an input Dataset.

A subquery consists of a name prefixed by the @ character followed by the definition in the curly brackets { with your OPAL inside}. You then invoke the subquery. In the following example, <- @ indicates the use of the default primary input, weather, without explicitly specifying it by name. The final <- @test_a {} invokes @test_a to generate the result:

// Starting from some dataset, which contains the field "city"
@test_a <- @ {
  filter city="Chicago"
}

// this is the final result of the entire query with subquery @test_a as its input
<- @test_a {}

Adding a New Stage

  1. Select the OPAL102Token Event Dataset.

  2. Rename the stage to Stage E - Using just One Stage with Subqueries.

Adding OPAL Statements to the New Stage

Add the following OPAL statements to the OPAL console, and click RUN.

// Determine the type of observations sent to Observe
make_col format:string(EXTRA.format)

// Subquery B: Just like Stage B - Structured Records
@subquery_B_structured_records <- @ {
  // Step 1: Filter to only the events that are structured in JSON format
  filter format = "jsonarray"
  
  // Step 2: For records that contain JSON, entire structured payload is in the FIELDS as an VARIANT/any
  make_col temperatureMeasurement:FIELDS
  
  // Step 3: Filter to only those measurements that had valid timestamps in the JSON structure
  filter path_exists(temperatureMeasurement, "current.dt")
  
  // Step 4: In this type of message, duration of measurement is stored in the nice property of JSON object
  make_col measurementDuration:duration_sec(int64(temperatureMeasurement.measurementDur))
}

// Subquery C: Just like Stage C - Unstructured Records
@subquery_C_unstructured_records <- @ {
  // Step 1: Filter to only the events from the log file that come in in form of single event per line
  filter format = "log"
  
  // Step 2: For records that contain text and JSON as a strinng, the payload is in the FIELDS.text string
  make_col logRaw:string(FIELDS.text)
  
  // Step 3: Each line of the log file is a separate row. Merge multiline input into single logical event
  merge_events 
    match_regex(logRaw, /\[\d{2}\/\d{2}\/\d{4}.*\]\[Thermostat.*\]/), 
    options(max_size: 1000, max_interval: 1m),  
    logMessageMultiline:string_agg(logRaw, ""), 
    order_by(BUNDLE_ID, OBSERVATION_INDEX), 
    group_by(format)
  
  // Step 4: Extract the log message and the temperature measurement
  extract_regex logMessageMultiline, /(?P<logMessageFromText>\[.*]Observed temperature:)\s(?P<temperatureMeasurement>{.*})/, "ims"
  
  // Step 5: Parse temperature measurement json. Because  we are joining this to another column of "object" datatype,
  // coerce it from "any"/variant datatype returned from parse_json to the "object" datatype
  make_col temperatureMeasurement:object(parse_json(temperatureMeasurement))
  
  // Step 6: Filter to only those measurements that had valid timestamps in the JSON structure
  filter path_exists(temperatureMeasurement, "current.dt")
  
  // Step 7: In this log, duration of measurement is stored in the messy CSV label if the message
  // Extract the comma-separated value of interesting context data from freeform text
  extract_regex logMessageFromText, /\[context:(?P<measurementContextRaw>.*)\]\[/, "i"
  // Parse the comma-separated context value
  make_col measurementContextCsv:parse_csv(measurementContextRaw)
  // The third item in the comma-separated string is an length of duration. We can easily index into it for display
  make_col measurementDuration:duration_sec(int64(measurementContextCsv[2]))
}

// Unioning things together: Just like Stage D - Structured And Unstructured Records, combining back to the parent
<- @subquery_B_structured_records {
    union @subquery_C_unstructured_records
}

// Step2: Extract a couple of values from JSON
make_col lat:float64(temperatureMeasurement.lat), lon:float64(temperatureMeasurement.lon)

// Step 3: Parse temperature 3 different ways
// This is the cleanest, platform native way
make_col tempValue:float64(temperatureMeasurement.current.temp)
// For fans of using explicit object navigation, newtonsoft/json.net and gson style
make_col tempValue_alt1:get_field(object(get_field(temperatureMeasurement, "current")), "temp")
// For fans of jq-style extraction, use JMESPath expressions
make_col tempValue_alt2:get_jmespath(temperatureMeasurement, "current.temp")

// Step 4: Get a temperature gauge with conditional formatting. Unicode! Emojis!
make_col "🌡️ temp":case(
    tempValue > 25,  "☀️ warm",
    tempValue > 10,  "🌦️ sorta warm",
    tempValue >= 0, "🌩️ cold",
    true, "❄️ freezing")

// Step 5: Create an explicit array from JSON object that contains it
make_col past30DaysTemp:array(temperatureMeasurement.past30Days)
// How many days ago has the temperature hit exactly 20?
make_col howLongAgo20Degrees:array_length(past30DaysTemp) - index_of_item(past30DaysTemp, 20)
// Get a slice of the array showing last 10 days only
make_col past10DaysTemp:slice_array(past30DaysTemp, -10)

// Step 6: Choose the columns, sometimes renaming them. Drop all the others
pick_col 
    BUNDLE_TIMESTAMP,
    lat,
    lon,
    tempValue,
    @."🌡️ temp", // Needs special treatment, using @ as current stage name, and quoted around emojis
    howLongAgo20Degrees,
    past10DaysTemp,
    past30DaysTemp,
    measurementDuration,
    temperatureMeasurement,
    format

Defining the Subqueries and Interactions

// ... common processing before subqueries

// Subquery B: Just like Stage B - Structured Records
@subquery_B_structured_records <- @ {
  ... OPAL code for Stage B - Structured Records omitted ... 
}

// Subquery C: Just like Stage C - Unstructured Records
@subquery_C_unstructured_records <- @ {
  // OPAL code for Stage C - Unstructured Records omitted
}

// Unioning things together: Just like Stage D - Structured And Unstructured Records, combining back to the parent
<- @subquery_B_structured_records {
    union @subquery_C_unstructured_records
}

// ... common processing after subqueries

You defined two subqueries subquery_B_structured_records and subquery_C_unstructured_records with the relevant code blocks and execution in parallel.

You then merged the results together by unioning subquery_C_unstructured_records to subquery_B_structured_records and sending the results back to the primary dataset using the <- operator.

The results of running this single block of OPAL should be identical to what you see in Stage D - Structured And Unstructured Records.

Adding OPAL code in the OPAL console

Figure 17 - Stage E after Running OPAL Code

Wrapping Up the Tutorial

To complete the tutorial, click the Pencil Pencil icon icon at the top of your Worksheet and name your Worksheet OPAL 102 Building Structured and Unstructured Example. Click Save to save the Worksheet.

Summary of Key Topics in the OPAL 102 Tutorial

In this tutorial, you learned how to perform the following tasks:

  • Create a Datastream to ingest data

  • Create an Ingest Token for the Datastream

  • Use cURL to ingest your log files into Observe in different formats

  • Create a Worksheet from your Dataset

  • Merge multiple events from multiple observation into single event

  • Use structured data to parse and manipulate JSON

  • Parse comma-separated values

  • Use array functions

  • Use type coercion

  • Use subqueries

  • Use Stages to split processing into logical chunks focused on just the applicable data

  • Use named subqueries to split processing