Batch Ingestion with Observe

The ideal scenario for data collection to any cloud service is continuous streaming. Data is generated in a containerized microservice, data is pushed to a webhook, data is collected by an agent. However, not all conditions are ideal. Sometimes data has to be polled from a collection point, possibly manipulated, and then sent on into Observe. If an Observe poller already exists in Integrations, that should be used. This tutorial will discuss tools for performing batched data collection when a poller does not already exist. The process has a large number of options, which are organized into Collection and Transmission.

Collection

The first piece of work is to get the data out of the source. Using Atlassian’s Jira as an example, here is a short Python 3 subroutine that uses Python requests to collect the items in a custom field. A webhook based integration exists for Jira events, but this subroutine is intended for collecting metadata about the Jira project from a custom field.

def query_get_jira_customers():
    #Constructs the query object and makes the request directly
    # The maximum number of records is irrelevant because Atlassian hard codes it
    jira_url_base = args.JIRA_API_URL + "/ex/jira/" + args.JIRA_ACCOUNT + "/rest/api/3/field/" + args.JIRA_FIELD + "/context/" args.JIRA_CONTEXT + "/option?"
    jira_url_string = jira_url_base + "&startAt=1&maxResults=1"
    jira_header = {
         'Accept':'application/json',
    }
    jira_total = requests.get(jira_url_string, headers=jira_header, auth=(args.JIRA_USER, args.JIRA_TOKEN)).json()['total']
    jira_results=[]
    jira_customers=[]
    jira_curr=1
    while jira_curr < jira_total:
        jira_url_string = jira_url_base + "&startAt=" + str(jira_curr) + "&maxResults=100"
        try:
            jira_result = requests.get(jira_url_string, headers=jira_header, auth=(args.JIRA_USER, args.JIRA_TOKEN))
        except requests.exceptions.RequestException as e:
            raise SystemExit(e)
        jira_result_json=json.loads(jira_result.content)
        jira_customers = [item.get('value','unknown') for item in jira_result_json.get('values',[])]
        jira_results = jira_results + jira_customers
        jira_curr = len(jira_results)
    #return jira_results
    jira_results=sorted(set(jira_results))
    message="Collected "+str(jira_total)+" records from Jira"
    logger.info(message)
    return(jira_results)

After calling this subroutine from a script, serverless infrastructure, or container, you can sort, deduplicate, transform, or enrich the data using any of the powerful tools that Python allows.

Transmission

Once collected, this data can be put into Observe in a number of ways. Each row could be written directly to the HTTP endpoint, for instance. However, for large amounts of data you might want more control over resource utilization and error handling. There are two mechanisms that are well suited for bulk ingestion into Observe: Filedrop, and the observe-http-sender Python client library.

Filedrop

To use Filedrop, your Python script would write the data to a file in an AWS S3 bucket. An Observe administrator can then configure Observe to read files written into that bucket. This process has a number of benefits, because the files are stored in S3 and can be managed separately.

Writing files to S3 can be quite simple, using Amazon’s Boto3 Python client library. A sample subroutine:

def write_json(file_data, file_name):
    filename = file_name.replace(" ", "_") + ".json"
    s3_connection = boto3.resource('s3')
    s3_connection.meta.client.put_object(
        Body = file_data,
        Bucket = bucket_name,
        Key = filename,
        ContentType = 'application/json',
    )

You might also change filename to include a unique ID or add a routine to purge old files.

Once configured in Observe, the Filedrop ingests all files in the bucket automatically.

Python sender

Observe’s observe-http-sender is a Python module that uses the HTTP endpoint, but adds asynchronous batching and flushing to handle larger scale more appropriately. A sample subroutine might look like this:

def post_to_observe(data,counter):
    # Setup Observer and its logging level.
    observer = ObserveHttpSender(args.OBSERVE_URL,args.OBSERVE_TOKEN)
    observer.log.setLevel(logging_level(args.OBSERVE_LOGGING_LEVEL))
    # Set Observer post path.
    observer.set_post_path("/mydata")
    # Check Observer for reachability
    observer_reachable =  observer.check_connectivity()
    if observer_reachable is False:
        raise(Exception("Observer Not Reachable: URL=%s" % (args.OBSERVE_URL)))
        sys.exit(1)
    try:
        observer.post_observation(data)
    except Exception as e:
        logger.warning(e)
        raise(e)
    try:
        observer.flush()
    except Exception as e:
        logger.warning(e)
        raise(e)
        sys.exit(1)
    message="Posted "+str(counter)+" data blocks to Observe"
    logger.info(message)