OPAL 102 – Shaping Structured and Unstructured Data Using Stages¶
Follow this tutorial to understand using OPAL to shape your data using Stages.
Scenario¶
In this scenario, you have an IoT system sending weather data in two formats. You want to manipulate the data and create a coherent picture of the weather.
In this tutorial, you learn how to manipulate your data using OPAL, the language used by Observe to perform database tasks.
Ingesting Data into Observe¶
Before working with OPAL, you must ingest your data into Observe using a Datastream and an Ingest Token. Use the following steps to start your ingestion process:
Log into your Observe instance, and click Settings in the left menu.
From the menu, select Workplace Settings.
From the left menu, select Datastreams.
Figure 1 - Creating a Datastream for Ingesting Data
4. Click Create Datastream.
5. Enter OPAL102Token as the Name.
6. For the Description, enter a short description of the Datastream. (Optional)
Figure 2 - Add Datastream Name and Description
7. Click Create.
Figure 3 - Datastream Created
Adding an Ingest Token to Your Datastream¶
To allow data into your Datastream, you must create a unique Ingest Token. Use the following steps to create an Ingest Token:
To add a new Ingest Token, click Create token.
Enter OPAL102Token as the Name.
For the Description, enter a short description of the Ingest Token. (Optional)
Figure 4 - Add Ingest Token Name and Description
4. Click Continue.
5. You receive a confirmation message confirming the creation of your token creation and the token value.
6. Click Copy to clipboard to copy the token value and then save it in a secure location.
Note
You can only view the token once in the confirmation message. If you don’t copy and paste the token into a secure location, and you lose the token, you have to create a new one.
7. Once you copy it into a secure location, select the confirmation checkbox, and then click Continue.
Figure 5 - Successfully creating an Ingest Token
8. After you click Continue, Observe displays Setup instructions for using the new Ingest Token.
Figure 6 - Setup instructions for using the Ingest Token
The URL, https://{customerID}.collect.observeinc.com/, displays your customer ID and URL for ingesting data.
Note
Some Observe instances may optionally use a name instead of Customer ID; if this is the case for your instance, contact your Observe Data Engineer to discuss implementation. A stem name will work as is, but a DNS redirect name may require client configuration.
HTTP requests must include your customer ID and Ingest Token information.
You can view the Setup instructions on your Datastream’s About tab.
Using cURL to Ingest Data into the Tutorial Datastream¶
This tutorial uses two log files containing weather data that you must send to your Observe instance:
Or
Download the files here.
Copy the files to the following directory:
MacOS -
home/<yourusername>
Windows -
C:\Users\<yourusername
Send this data to the v1/http
endpoint using cURL. See EndPoints in the Observe User Guide for more details.
Details about the URL
The path component of the URL encoded as a tag.
Query parameters also encoded as tags.
Before you ingest the logs, check the health of the Observe API instance using the following cURL:
curl https://api.observe-eng.com/v1/health
You should see the following response from the instance:
{"ok":true,"message":"Everything's perfectly all right now. We're fine. We're all fine here, now, thank you. How are you?"}%
To accomplish this, add the path, OPAL102
, and the query parameter, format
. Keep this information in mind for use later in the tutorial.
Copy and paste the following cURL to ingest the log data into the Datastream. Replace ${CustomerID} with your Customer ID, such as 123456789012
. Replace{token_value}
with the token you created in Adding an Ingest Token to Your Datastream.
curl "https://${CustomerID}.collect.observeinc.com/v1/http/OPAL102?format=log" \
-H "Authorization: Bearer {token_value}" \
-H "Content-type: text/plain" \
--data-binary @./Measurements.MessyLog.log
curl "https://${CustomerID}.collect.observeinc.com/v1/http/OPAL102?format=jsonarray" \
-H "Authorization: Bearer {token_value}" \
-H "Content-type: application/json" \
--data-binary @./Measurements.ArrayOf.json
Each cURL command returns {"ok":true}
when successful.
After ingesting the data, you should see the data in the Datastream, OPAL102Token.
Figure 7 - Successfully ingesting data into the datastream
Click Open dataset to view the ingested data.
Select a row to display more information about that log event.
Figure 8 - Displaying details about a row of ingested data
OPAL and Processing in Stages¶
What is OPAL?¶
An OPAL pipeline consists of a sequence of statements where the output of one statement creates the input for the next. This could be a single line with one statement or many lines of complex shaping and filtering.
A pipeline contains four types of elements:
Inputs - define the applicable datasets.
Verbs - define what processing to perform with those datasets.
Functions - define transforming individual values in the data.
Outputs - pass a dataset on to the next verb or the final result.
In this tutorial, you continue to build on concepts and tasks described in OPAL 101 Tutorial and become familiar with the concepts of structured and semi-structured data, manipulating arrays, conditional data shaping, stages, and subqueries.
Introducing Stages and the Configuration Plan¶
In this exercise, you have an IoT system sending weather data. The system sends data in two formats:
Measurements.ArrayOf.json
file sent in anapplication/json
format, where each semi-structured chunk of data automatically treated by Observe as a complete observation.Measurements.MessyLog.log
file sent astext/plain
format, with some unstructured data and some semi-structured data on multiple lines. Observe treats each line of the file as a separate observation. Log tracing systems send this data format because it has no knowledge about the type of message.
When you ingest the files, Observe stores them in the Observe DataStream. You can manipulate different types of messages and normalize them into a coherent whole using a concept called a Stage. A stage consists of an individual results table in a Worksheet. Stages can be linked and chained together in sequences and branching.
Building Stages Hierarchy and Graph¶
In this scenario, you perform the following tasks:
Create Stage A - All Records by Type to decide the incoming format from the Datastream.
Create Stage B - Structured Records derived from Stage A to convert unstructured datasets from
Measurements.MessyLog.log
into structured data.Create a Stage C - Unstructured Records derived from Stage A to turn structured datasets from
Measurements.ArrayOf.json
and perform light processing to match the structure from Stage B.Join data from Stage B and Stage C together in Stage D - Structured And Unstructured Records and perform the additional processing and formatting on the joined data.
Figure 9 - Workflow for Data Shaping
Creating Stage A - All Records By Type¶
To create a Worksheet and start modeling your data, click Create Worksheet at the top of your OPAL102Token Dataset.
Figure 10 - Create a Worksheet from the OPAL102Token Dataset
2. Rename the name of the only stage in the worksheet from Stage 1 to Stage A - All Records By Type.
Figure 11 - Change the Stage name to Stage A - All Records by Type
3. Add the following OPAL code in the OPAL console:
// Determine the type of observations sent to Observe
make_col format:string(EXTRA.format)
Figure 12 - Adding OPAL code in the OPAL console
Classify Records using the filter
function¶
Create a format
column containing either jsonarray
or log
values based on parameters of the query string of cURL
requests that sent the data to Observe.
This type of basic extraction from raw observations and filtering should be familiar from the OPAL 101 Tutorial example.
Creating Stage B - Structured Records¶
Add a second stage that builds on the previous Stage A by clicking Link New Stage at the bottom of the Worksheet.
Rename the stage to Stage B - Structured Records.
Figure 13 - Adding Stage B - Structured Records
Add the following OPAL code to your OPAL console:
// Step 1: Filter to only the events structured in JSON format
filter format = "jsonarray"
// Step 2: For records that contain JSON, the entire structured payload is in the FIELDS as an VARIANT/any
rename_col temperatureMeasurement:FIELDS
// Step 3: Filter to only those measurements with valid timestamps in the JSON structure
filter path_exists(temperatureMeasurement, "current.dt")
// Step 4: In this type of message, duration of measurement is stored in the nice property of JSON object
make_col measurementDuration:duration_sec(int64(temperatureMeasurement.measurementDur))
Step 1 - Filtering Data using the filter
Function¶
// Step 1: Filter to only the events structured in JSON format
filter format = "jsonarray"
In the OPAL code, Step 1 uses the filter
function to filter the data to only records containing the jsonarray
value in the format
column.
Step 2 - Renaming an Existing Column using the rename_col
Function¶
// Step 2: For records that contain JSON, the entire structured payload is in the FIELDS as a VARIANT/any
rename_col temperatureMeasurement:FIELDS
Step 2 renames the existing column using the rename_col
function. These records ingested into Observe with the application/json
format, and automatically stored in the FIELDS
column in any data type. This datatype is ready to be used.
Use the rename_col
projection operation to simply rename the FIELDS
column to temperatureMeasurement
. Not creating a new column saves processing resources in the data conversion pipeline.
Step 3 - Checking for Key Existence using the path_exists
Function¶
// Step 3: Filter to only those measurements with valid timestamps in the JSON structure
filter path_exists(temperatureMeasurement, "current.dt")
In this example, one of the measurements does not contain the current.dt
key, and for purposes of this tutorial, considered invalid. This OPAL code in Step 3 uses the path_exists
function to test for the existence of this key.
Valid Measurement | Invalid Measurement |
---|---|
{ "measurementDur": 128, "lat": 59.31, "lon": -64.5, "current": { "dt": 1646348698, "temp": -2.0 }, "past30Days": [10, …] } | { "measurementDur": 256, "lat": 59.31, "lon": -64.5, "current": { "temp": 27.01 }, "past30Days": [16, …] } |
Step 4 - Converting a Numeric Value to a Duration using duration_sec
Function¶
//Step 4: In this type of message, duration of measurement is stored in the nice property of JSON object
make_col measurementDuration:duration_sec(int64(temperatureMeasurement.measurementDur))
Step 4 converts numeric values,measurementDur
, to durations using the duration_sec
function.
Figure 14 - Converting Numeric Values to Duration Values
Creating Stage C - Unstructured Records¶
Return to Stage A and click Link New Stage to add a third stage.
Rename the new stage to Stage C - Unstructured Records.
Add the following block of OPAL code and click Run:
// Step 1: Filter to only the events from the log file that comes in in form of single event per line
filter format = "log"
// Step 2: For records that contain text and JSON as a string, the
payload is in the FIELDS.text string
make_col logRaw:string(FIELDS.text)
// Step 3: Each line of the log file is a separate row. Merge multiline input into a single logical event
merge_events
match_regex(logRaw, /\[\d{2}\/\d{2}\/\d{4}.*\]\[Thermostat.*\]/),
options(max_size: 1000, max_interval: 1m),
logMessageMultiline:string_agg(logRaw, ""),
order_by(BUNDLE_ID, OBSERVATION_INDEX),
group_by(format)
// Step 4: Extract the log message and the temperature measurement
extract_regex logMessageMultiline, /(?P<logMessageFromText>\[.*]Observed temperature:)\s(?P<temperatureMeasurement>{.*})/, "ims"
// Step 5: Parse temperature measurement json. Because we are joining
this to another column of "object" datatype, you coerce it from "any"/variant datatype returned from parse_json to the "object" datatype
make_col temperatureMeasurement:object(parse_json(temperatureMeasurement))
// Step 6: Filter to only those measurements that had valid timestamps in the JSON structure
filter path_exists(temperatureMeasurement, "current.dt")
// Step 7: In this log, duration of measurement is stored in the messy CSV label if the message
// Extract the comma-separated value of interesting context data from freeform text
extract_regex logMessageFromText, /\[context:(?P<measurementContextRaw>.*)\]\[/, "i"
// Parse the comma-separated context value
make_col measurementContextCsv:parse_csv(measurementContextRaw, false)
// The third item in the comma-separated string is an length of duration. You can easily index into it for display
make_col measurementDuration:duration_sec(int64(measurementContextCsv[2]))
Figure 15 - Stage C after Running OPAL Code
Step 1 - Filter Data using the filter
Function¶
This step filters data from Stage A to contain only records with the value, log
, in the format
column. Focusing only on the desired records saves processing resources.
Observe ingests these records in the text/plain
format and automatically stores them in the FIELDS
column as an object of any data type with each incoming line in a text
key, for example,
{"text":"[05/05/2023 13:59:18.847][Thermostat/112.0.0.0][context:2018/2018,0,64,330130][I:SH2WB01IN1AWN]Observed temperature: {\r\n"}
{"text":" \"lon\": -74.5,\r\n"}
{"text":"[05/05/2023 13:59:18.847][Thermostat/112.0.0.0][context:2018/2018,0,32,330130][I:SH2WB01IN1AWN]Observed temperature: {\r\n"}
{"text":" \"dt\": 1646338698,\r\n"}
{"text":" \"dt\": 1646318698,\r\n"}
{"text":" \"lat\": 39.31,\r\n"}
{"text":" \"current\": {\r\n"}
Step 2 - Project New Object Column using make_col
Function¶
Each row contains a single observation, but Observe can merge the related items into a single value. Use the make_col
projection operation to extract the FIELDS.text
key and cast the value to logRaw
string column. This allows you to prepare to merge events in the next step.
Step 3 - Combining Events with the merge_events
Aggregate Verb¶
Each line of the log file is a separate row. Merge multi-line input into a single logical event.
Use the merge_events
aggregate verb to merge consecutive events into a single event.
Details about each OPAL statement:
// Step 3: Each line of the log file is a separate row. Merge multiline input into a single logical event
merge_events
//^^^^^^^^^^ Name of the function
match_regex(logRaw, /\[\d{2}\/\d{2}\/\d{4}.*\]\[Thermostat.*\]/),
//^^^^^^^^^^ match_regex function
// ^^^^^^^ column to scan for values
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ regex to match that indicates beginning of the message
options(max_size: 1000, max_interval: 1m),
//^^^^^^^ options function
// ^^^^^^^^ maximum number of events to merge into single event
// ^^^^^^^^^^^^ maximum duration between events
logMessageMultiline:string_agg(logRaw, ""),
//^^^^^^^^^^^^^^^^^^^ name of column that will contain new event
// ^^^^^^^^^^ the aggregate expression that combines multiple events into single one
// ^^^^^^ column to operate on
// ^^ delimiter to combine events. In our case it's empty
order_by(BUNDLE_ID, OBSERVATION_INDEX),
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ how to order records
group_by(format)
// ^^^^^^ what to group the events by
To match the first row for merge_events
, use match_regex
function to find the row that begins the event.
The options
function is a special function to provide options to modify the behavior of various functions. It takes key/value pairs of modifiers. For the merge_events
function, use max_size
to indicate the maximum number of events to merge and max_interval
and the maximum duration between the first and last event. For this tutorial, the selected value, 1 minute, could be too generous, but for real-world examples, it is possible to have some time skew between incoming records that should be merged. Larger match intervals can lead to using more computational credits used in processing data.
The string_agg
aggregate function combines multiple values separated by a delimiter.
The order_by
clause forces the right sequence of events to be combined. The default time order for order_by
is time-ascending. When records are ingested using the cURL
command, all of the timestamps for those records are within nanoseconds of each other, and all file contents have the same BUNDLE_ID
. Each line has a unique, increasing OBSERVATION_INDEX
, making it easy to reconstruct the order of lines.
If you don’t specify group_by
, the default grouping for merge_events
becomes a set of primary key columns. In this example, you have not defined any primary keys and for completeness, use the format
column, which in this stage is always the same.
Step 4 - Using extract_regex
Function to Split Unstructured and Structured Data from a String¶
Extract the log message and the temperature measurement using the following OPAL code.
You reconstructed multiple lines of ingested log files into a single event in logMessageMultiline
string column, and in this section, use the extract_regex
verb to convert unstructured strings into structured data. This produces the logMessageFromText
and the temperatureMeasurement
string columns with the latter one containing a JSON string to parse. See the previous [tutorial] for a more detailed breakdown explanation of extract_regex
.
Step 5 - Creating an Object from a JSON String Using parse_json
and object
Functions¶
Parse the temperature measurement JSON. Because you joined this to another column of object
datatype, extract it from the "any"/variant
datatype returned from parse_json
to the object
datatype.
OPAL Details
make_col temperatureMeasurement:object(parse_json(temperatureMeasurement))
// ^^^^^^^^^^^^^^^^^^^^^^ New column name
// ^^^^^^ casting of 'any' type variable to 'object' type variable
// ^^^^^^^^^^^ parsing of json from text
// ^^^^^^^^^^^^^^^^^^^^^^ text value to parse
The parse_json
function parses the string into a temperatureMeasurement
column of any data type which can be an object or an array data type. In a later stage when you combine this Stage C - Unstructured Records with results of Stage B - Structured Records, the types of columns must match, and the temperatureMeasurement
column in Stage B - Structured Records is already an object data type. Therefore, use the object
function to coerce this column to the object data type.
Step 6 - Checking for Key Existence in Object Data with path_exists
Function¶
This step uses the same logic as Step 3 of Creating Stage B - Structured Records.
Step 7 - Parsing CSV Strings Using the parse_csv
Function¶
In this log, the duration of the measurement is stored in the messy CSV label and you extract the comma-separated value of interesting context data from free-form text.
OPAL Details
// Extract the comma-separated value of interesting context data from freeform text
extract_regex logMessageFromText, /\[context:(?P<measurementContextRaw>.*)\]\[/, "i"
// ^^^^^^^^^^^^^^^^^^^^^^ Column to extract with regular expression
// Parse the comma-separated context value
make_col measurementContextCsv:parse_csv(measurementContextRaw)
// ^^^^^^^^^ Parsing of CSV value
// ^^^^^^^^^^^^^^^^^^^^^ Column to parse CSV from
//
// The third item in the comma-separated string is the length of duration.
make_col measurementDuration:duration_sec(int64(measurementContextCsv[2]))
// ^^^^^^^^^^^^ Convert the value to duration in seconds
The example log message, when stripped of the JSON, contains a context block with some values:
[05/05/2023 13:59:18.847][Thermostat/112.0.0.0][context:2018/2018,0,16,330130][I:SH2WB01IN1AWN]Observed temperature:
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ you want this value
In this message, you extract the comma-separated string with the extract_regex
function, and use the parse_csv
function to convert it into an array.
The third item in the comma-separated string is the duration length. You can index it for display purposes.
The third value of the array, arrays in OPAL are 0 based, so index 2, contains the duration of the measurement taken by the sensor which you change to duration data type using the duration_sec
function, just like in Step 4 of Stage B - Structured Records.
Creating Stage D - Structured and Unstructured Records¶
You have split the processing of data from Stage A into Stage B and Stage C. Now you bring the processing of the data back into a single stage.
Add a stage that links to Stage B using the following steps:
Return to Stage B.
Click Link New Stage to add a fourth stage.
Rename the new stage to Stage D - Structured and Unstructured Records.
Click Input next to the OPAL console and then click Add Input.
Select the Stage C - Unstructured Records stage.
When you add input to the Stage, Observe creates a reference to another Stage with a certain auto-generated suffix. If you named your Stages as presented in the tutorial, the name of Stage reference should be Stage C - Unstructured Records_-tc62. If not, then adjust your OPAL input accordingly.
Add the following block of OPAL code to the OPAL console:
// Step 1: Combine Stages B and C. B is implied as the first Stage in the Inputs tab.
union @"C - Unstructured Records_-tc62"
// Step2: Extract a couple of values from JSON
make_col lat:float64(temperatureMeasurement.lat), lon:float64(temperatureMeasurement.lon)
// Step 3: Parse temperature 3 different ways
// This is the cleanest, platform native way
make_col tempValue:float64(temperatureMeasurement.current.temp)
// For fans of using explicit object navigation, newtonsoft/json.net and gson style
make_col tempValue_alt1:get_field(object(get_field(temperatureMeasurement, "current")), "temp")
// For fans of jq-style extraction, use JMESPath expressions
make_col tempValue_alt2:get_jmespath(temperatureMeasurement, "current.temp")
// Step 4: Get a temperature gauge with conditional formatting. Unicode! Emojis!
make_col "🌡️ temp":case(
tempValue > 25, "☀️ warm",
tempValue > 10, "🌦️ sorta warm",
tempValue >= 0, "🌩️ cold",
true, "❄️ freezing")
// Step 5: Create an explicit array from JSON object that contains it
make_col past30DaysTemp:array(temperatureMeasurement.past30Days)
// How many days ago has the temperature hit exactly 20?
make_col howLongAgo20Degrees:array_length(past30DaysTemp) - index_of_item(past30DaysTemp, 20)
// Get a slice of the array showing last 10 days only
make_col past10DaysTemp:slice_array(past30DaysTemp, -10)
// Step 6: Choose the columns, sometimes renaming them. Drop all the others
pick_col
BUNDLE_TIMESTAMP,
lat,
lon,
tempValue,
@."🌡️ temp", // Needs special treatment, using @ as current stage name, and quoted around emojis howLongAgo20Degrees,
past10DaysTemp,
past30DaysTemp,
measurementDuration,
temperatureMeasurement,
format
Figure 16 - Stage D after Running OPAL Code
Step 1 - Combining the Results of Multiple Stages with the union
Verb¶
Use the union
verb to combine results of B - Structured Records and C - Unstructured Records stages.
If you named the stages as stated in each section, the name of the stage reference should be C - Unstructured Records_-tc62 because Observe adds a small semi-random hash to the name of the variable.
Unlike direct database engines where the unioned tables are often mandated to be identical, in Observe, the union
verb does not require this. The two stages don’t fully match, and any records where columns are not shared receive NULL values in the opposite dataset.
Step 2 - Extracting Numeric Values from a JSON object using the make_col
Verb and float64
Function¶
In this step, you extract latitude and longitude from the JSON object representing the measurement of data. The object was reconstructed from multiple text lines in the Stage C - Unstructured Records and turned into an object, and parsed the same way as the object that arrived to Observe already in one piece.
Step 3 - Extracting Numeric Values from a JSON Object Natively with get_field
or get_jmespath
Functions¶
In this step, you can see three ways of addressing JSON object paths using OPAL.
In the sample data, the observation has the following format:
{
"measurementDur": 16,
"lat": 49.31,
"lon": -74.5,
"current": {
"dt": 1646318698,
"temp": 22.21
},
"past30Days": [16, 15, ..., 18]
}
If you want to obtain the value of the temp
key, a child of the current
object, use OPAL with .
(dot) notation and [0]
indexers.
If you want to use explicit object navigation, OPAL uses a get_field
function to extract a key from a given object. The function produces the same result as before, but uses more OPAL to do so.
//use explicit object navigation, newtonsoft/json.net and gson style
make_col tempValue_alt1:get_field(object(get_field(temperatureMeasurement, "current")), "temp")
// ^^^^^^^^^ ^^^^ Get the 'temp' property of child object
// ^^^^^^
// ^^^^^^^^^ ^^^^^^^^ Get the 'current' property of parent object
// ^^^^^^^^^^^^^^^^^^^^^^ Parent object
As a third option, OPAL implements a get_field
function to use JMESPath
, a query language for JSON. Unlike the previous two functions, get_field
may be more expensive to use.
//use JMESPath expressions
make_col tempValue_alt2:get_jmespath(temperatureMeasurement, "current.temp")
Step 4 - Using the case
Function for Conditional Evaluation¶
4. Create a temperature gauge with conditional formatting and emojis.
Step 4 introduces conditional evaluation using the case
function. OPAL evaluates conditions and results in pairs in the order of argument. The first successful evaluation to true
ends execution.
This step also highlights that Observe platform allows for a bit of color and fun with any Unicode character, including emojis, in both the values and in field names
.
Step 5 - Supporting Arrays with the array
, array_length
, index_of_item
, and slice_array
Functions¶
Like all programming languages, OPAL supports arrays for creation, measurement, slicing, and indexing. Step 5 uses an array
function to convert an implicit array from a JSON object to a real array, and an array_length
function to measure the length of the array. The step also uses the index_of_item
function to find an index of a specific item in the array, and the slice_array
function to take the last 10 items from the longer array.
Step 6 - Using the pick_column
Verb to Select Columns to Display¶
You’ve created a lot of intermediate columns so far in this tutorial. Now you can use the OPAL pick_col
verb to choose the columns you want to see as the final result and optionally rename them.
Note to reference a field with non-alphanumeric characters like the fun emoji thermometer, you must use double quotes and prepend with @.
.
Creating Stage E - Use One Stage with Subqueries (optional)¶
A standard OPAL query consists of a linear series of steps with one or more dataset inputs. Using Stages as shown in this tutorial makes it possible to isolate portions of processing into logically isolated units of work and build data flows that split and merge as needed.
You can combine all the work you’ve done in earlier Stages into a single Stage with a single OPAL script. Observe enables this using the concept of Subquery. Logically, a Subquery is similar to a Stage using an input Dataset.
A subquery consists of a name prefixed by the @
character followed by the definition in the curly brackets { with your OPAL inside}
. You then invoke the subquery. In the following example, <- @
indicates the use of the default primary input, weather
, without explicitly specifying it by name. The final <- @test_a {}
invokes @test_a
to generate the result:
// Starting from some dataset, which contains the field "city"
@test_a <- @ {
filter city="Chicago"
}
// this is the final result of the entire query with subquery @test_a as its input
<- @test_a {}
Adding a New Stage¶
Select the OPAL102Token Event Dataset.
Rename the stage to Stage E - Using just One Stage with Subqueries.
Adding OPAL Statements to the New Stage¶
Add the following OPAL statements to the OPAL console, and click RUN.
// Determine the type of observations sent to Observe
make_col format:string(EXTRA.format)
// Subquery B: Just like Stage B - Structured Records
@subquery_B_structured_records <- @ {
// Step 1: Filter to only the events that are structured in JSON format
filter format = "jsonarray"
// Step 2: For records that contain JSON, entire structured payload is in the FIELDS as an VARIANT/any
make_col temperatureMeasurement:FIELDS
// Step 3: Filter to only those measurements that had valid timestamps in the JSON structure
filter path_exists(temperatureMeasurement, "current.dt")
// Step 4: In this type of message, duration of measurement is stored in the nice property of JSON object
make_col measurementDuration:duration_sec(int64(temperatureMeasurement.measurementDur))
}
// Subquery C: Just like Stage C - Unstructured Records
@subquery_C_unstructured_records <- @ {
// Step 1: Filter to only the events from the log file that come in in form of single event per line
filter format = "log"
// Step 2: For records that contain text and JSON as a strinng, the payload is in the FIELDS.text string
make_col logRaw:string(FIELDS.text)
// Step 3: Each line of the log file is a separate row. Merge multiline input into single logical event
merge_events
match_regex(logRaw, /\[\d{2}\/\d{2}\/\d{4}.*\]\[Thermostat.*\]/),
options(max_size: 1000, max_interval: 1m),
logMessageMultiline:string_agg(logRaw, ""),
order_by(BUNDLE_ID, OBSERVATION_INDEX),
group_by(format)
// Step 4: Extract the log message and the temperature measurement
extract_regex logMessageMultiline, /(?P<logMessageFromText>\[.*]Observed temperature:)\s(?P<temperatureMeasurement>{.*})/, "ims"
// Step 5: Parse temperature measurement json. Because we are joining this to another column of "object" datatype,
// coerce it from "any"/variant datatype returned from parse_json to the "object" datatype
make_col temperatureMeasurement:object(parse_json(temperatureMeasurement))
// Step 6: Filter to only those measurements that had valid timestamps in the JSON structure
filter path_exists(temperatureMeasurement, "current.dt")
// Step 7: In this log, duration of measurement is stored in the messy CSV label if the message
// Extract the comma-separated value of interesting context data from freeform text
extract_regex logMessageFromText, /\[context:(?P<measurementContextRaw>.*)\]\[/, "i"
// Parse the comma-separated context value
make_col measurementContextCsv:parse_csv(measurementContextRaw)
// The third item in the comma-separated string is an length of duration. We can easily index into it for display
make_col measurementDuration:duration_sec(int64(measurementContextCsv[2]))
}
// Unioning things together: Just like Stage D - Structured And Unstructured Records, combining back to the parent
<- @subquery_B_structured_records {
union @subquery_C_unstructured_records
}
// Step2: Extract a couple of values from JSON
make_col lat:float64(temperatureMeasurement.lat), lon:float64(temperatureMeasurement.lon)
// Step 3: Parse temperature 3 different ways
// This is the cleanest, platform native way
make_col tempValue:float64(temperatureMeasurement.current.temp)
// For fans of using explicit object navigation, newtonsoft/json.net and gson style
make_col tempValue_alt1:get_field(object(get_field(temperatureMeasurement, "current")), "temp")
// For fans of jq-style extraction, use JMESPath expressions
make_col tempValue_alt2:get_jmespath(temperatureMeasurement, "current.temp")
// Step 4: Get a temperature gauge with conditional formatting. Unicode! Emojis!
make_col "🌡️ temp":case(
tempValue > 25, "☀️ warm",
tempValue > 10, "🌦️ sorta warm",
tempValue >= 0, "🌩️ cold",
true, "❄️ freezing")
// Step 5: Create an explicit array from JSON object that contains it
make_col past30DaysTemp:array(temperatureMeasurement.past30Days)
// How many days ago has the temperature hit exactly 20?
make_col howLongAgo20Degrees:array_length(past30DaysTemp) - index_of_item(past30DaysTemp, 20)
// Get a slice of the array showing last 10 days only
make_col past10DaysTemp:slice_array(past30DaysTemp, -10)
// Step 6: Choose the columns, sometimes renaming them. Drop all the others
pick_col
BUNDLE_TIMESTAMP,
lat,
lon,
tempValue,
@."🌡️ temp", // Needs special treatment, using @ as current stage name, and quoted around emojis
howLongAgo20Degrees,
past10DaysTemp,
past30DaysTemp,
measurementDuration,
temperatureMeasurement,
format
Defining the Subqueries and Interactions
// ... common processing before subqueries
// Subquery B: Just like Stage B - Structured Records
@subquery_B_structured_records <- @ {
... OPAL code for Stage B - Structured Records omitted ...
}
// Subquery C: Just like Stage C - Unstructured Records
@subquery_C_unstructured_records <- @ {
// OPAL code for Stage C - Unstructured Records omitted
}
// Unioning things together: Just like Stage D - Structured And Unstructured Records, combining back to the parent
<- @subquery_B_structured_records {
union @subquery_C_unstructured_records
}
// ... common processing after subqueries
You defined two subqueries subquery_B_structured_records
and subquery_C_unstructured_records
with the relevant code blocks and execution in parallel.
You then merged the results together by unioning subquery_C_unstructured_records
to subquery_B_structured_records
and sending the results back to the primary dataset using the <-
operator.
The results of running this single block of OPAL should be identical to what you see in Stage D - Structured And Unstructured Records.
Figure 17 - Stage E after Running OPAL Code
Wrapping Up the Tutorial¶
To complete the tutorial, click the Pencil icon at the top of your Worksheet and name your Worksheet OPAL 102 Building Structured and Unstructured Example. Click Save to save the Worksheet.
Summary of Key Topics in the OPAL 102 Tutorial¶
In this tutorial, you learned how to perform the following tasks:
Create a Datastream to ingest data
Create an Ingest Token for the Datastream
Use cURL to ingest your log files into Observe in different formats
Create a Worksheet from your Dataset
Merge multiple events from multiple observation into single event
Use structured data to parse and manipulate JSON
Parse comma-separated values
Use array functions
Use type coercion
Use subqueries
Use Stages to split processing into logical chunks focused on just the applicable data
Use named subqueries to split processing