Batch Ingestion with Observe¶
The ideal scenario for data collection to any cloud service is continuous streaming. Data is generated in a containerized microservice, data is pushed to a webhook, data is collected by an agent. However, not all conditions are ideal. Sometimes data has to be polled from a collection point, possibly manipulated, and then sent on into Observe. If an Observe poller already exists in Integrations, that should be used. This tutorial will discuss tools for performing batched data collection when a poller does not already exist. The process has a large number of options, which are organized into Collection and Transmission.
Collection¶
The first piece of work is to get the data out of the source. Using Atlassian’s Jira as an example, here is a short Python 3 subroutine that uses Python requests
to collect the items in a custom field. A webhook based integration exists for Jira events, but this subroutine is intended for collecting metadata about the Jira project from a custom field.
def query_get_jira_customers():
#Constructs the query object and makes the request directly
# The maximum number of records is irrelevant because Atlassian hard codes it
jira_url_base = args.JIRA_API_URL + "/ex/jira/" + args.JIRA_ACCOUNT + "/rest/api/3/field/" + args.JIRA_FIELD + "/context/" args.JIRA_CONTEXT + "/option?"
jira_url_string = jira_url_base + "&startAt=1&maxResults=1"
jira_header = {
'Accept':'application/json',
}
jira_total = requests.get(jira_url_string, headers=jira_header, auth=(args.JIRA_USER, args.JIRA_TOKEN)).json()['total']
jira_results=[]
jira_customers=[]
jira_curr=1
while jira_curr < jira_total:
jira_url_string = jira_url_base + "&startAt=" + str(jira_curr) + "&maxResults=100"
try:
jira_result = requests.get(jira_url_string, headers=jira_header, auth=(args.JIRA_USER, args.JIRA_TOKEN))
except requests.exceptions.RequestException as e:
raise SystemExit(e)
jira_result_json=json.loads(jira_result.content)
jira_customers = [item.get('value','unknown') for item in jira_result_json.get('values',[])]
jira_results = jira_results + jira_customers
jira_curr = len(jira_results)
#return jira_results
jira_results=sorted(set(jira_results))
message="Collected "+str(jira_total)+" records from Jira"
logger.info(message)
return(jira_results)
After calling this subroutine from a script, serverless infrastructure, or container, you can sort, deduplicate, transform, or enrich the data using any of the powerful tools that Python allows.
Transmission¶
Once collected, this data can be put into Observe in a number of ways. Each row could be written directly to the HTTP endpoint, for instance. However, for large amounts of data you might want more control over resource utilization and error handling. There are two mechanisms that are well suited for bulk ingestion into Observe: Filedrop, and the observe-http-sender Python client library.
Filedrop¶
To use Filedrop, your Python script would write the data to a file in an AWS S3 bucket. An Observe administrator can then configure Observe to read files written into that bucket. This process has a number of benefits, because the files are stored in S3 and can be managed separately.
Writing files to S3 can be quite simple, using Amazon’s Boto3 Python client library. A sample subroutine:
def write_json(file_data, file_name):
filename = file_name.replace(" ", "_") + ".json"
s3_connection = boto3.resource('s3')
s3_connection.meta.client.put_object(
Body = file_data,
Bucket = bucket_name,
Key = filename,
ContentType = 'application/json',
)
You might also change filename
to include a unique ID or add a routine to purge old files.
Once configured in Observe, the Filedrop ingests all files in the bucket automatically.
Python sender¶
Observe’s observe-http-sender
is a Python module that uses the HTTP endpoint, but adds asynchronous batching and flushing to handle larger scale more appropriately. A sample subroutine might look like this:
def post_to_observe(data,counter):
# Setup Observer and its logging level.
observer = ObserveHttpSender(args.OBSERVE_URL,args.OBSERVE_TOKEN)
observer.log.setLevel(logging_level(args.OBSERVE_LOGGING_LEVEL))
# Set Observer post path.
observer.set_post_path("/mydata")
# Check Observer for reachability
observer_reachable = observer.check_connectivity()
if observer_reachable is False:
raise(Exception("Observer Not Reachable: URL=%s" % (args.OBSERVE_URL)))
sys.exit(1)
try:
observer.post_observation(data)
except Exception as e:
logger.warning(e)
raise(e)
try:
observer.flush()
except Exception as e:
logger.warning(e)
raise(e)
sys.exit(1)
message="Posted "+str(counter)+" data blocks to Observe"
logger.info(message)