Architecture for non-deterministic mass data collection: part 1: collection engine

bitsofinfo
Level Up Coding
Published in
17 min readJan 23, 2022

--

Note, this is part one of a two part series about this project; article #2 is here.

One of my more recent projects was spawned from a pretty interesting idea. The team wanted to build a system that would permit them to scour the Internet for information regarding a particular set of targets; a “target” being defined as product, company, topic, person, location etc… anything. Once collected, any particular target as well as collections of targets could be used for historical and predictive analysis for various goals.

For each target, a select curated set of very specific endpoints would serve as the entry points on the Internet to find information on the target, and then from there, linkages could be followed for supported endpoints and the data from those linkages collected.. and on an on… similar to a crawler but on more of a leash. The goal was that data being collected on a target would be further curated with custom logic into more enriched data sets for each target as well as how any particular target relates to other targets. The desire was that new targets would also be discovered throughout this process and then promoted to first class targets that could subsequently be monitored going forward. Again, all of this data was to be utilized for historical and predictive analysis.

Where to start? Logical architecture

As we started, some of the decisions were already made for the team such as the cloud provider that this would execute in: AWS. From there, given a project of this scale it would be easy to get overwhelmed so we ended up breaking it up into some high level design goals :

  • First design a data lake structure that would store all the raw data collected in an organized manner. This lake would also need to be able to store additional permutations of the data as it gets refined through a lifecycle. All data would be immutable.
  • Create a generic collection/crawling engine that would collect raw target data from various endpoints, lightly wrap it in structure, decorate it with relevant meta-data and store it in the data lake as documents. The ability to add support for new endpoints should be relatively easy. The engine needed to be recursive, but on a tight leash. The raw data stored from the engine at this phase would be deemed level 1 data as it would be stored “as-is” (document format) could serve as a permanent snapshot in time of what was collected on that run.
  • Each collector “result” document would contain a meta-data such as its own unique identifier, timestamps, parent lineage information,
  • Utilize a secondary database that the collection engine would use to store additional meta-data about what was collected as well as to track lineage relationships between all things collected. Ideally this would permit us to build a map of what was crawled and serve as a high level graph for each collection run for a given target that could be referenced as need be in subsequent steps.
  • Develop a secondary process that would consume the raw result documents and convert them into a more structured flat representation of the raw format (more like a table). These converted results were to be stored in the data lake in one or more format(s) (columnar and/or row oriented) that could be queried by standard big data tooling (both by humans and automation) for further analysis and processing. The data at this phase would be deemed data.
  • Next create a tertiary process that could consume the structured data and convert it into highly structured concrete types with many additional attributes specifically relevant to the actual endpoint data embedded within the dataset wrapper. This would be called Layer 2 Type Specific (L2TS) data. This would also likely be the most challenging portion to make a reality at scale. L2TS data would be the core data set that would be consumed both data analysts, ML engineers and ultimately a logic layer for enrichment and promotion to first class representations of targets (L3 and beyond) that would ultimately be used downstream for other goals.

The list above represented the first set of goals that would serve as the foundation of a multi-staged process that would continually feed the data lake with consumable, structured data that was sourced from highly varying underlying endpoints. The diagram below shows the logical architecture and shows how the data for any given raw result would start out quite coarse and then explode into much more granular refinements through the different layers of transformation.

So what might this actually look like in reality and what might be an example “target” and an “endpoint” we want to collect information from? Lets walk through a use case:

In this case a collector implementation for the Twitter APIs would be need to be developed. It would support taking a configurable payload with arguments, one of which being a string target (in this case “kayaking in florida”). With this target string its implementation could execute a twitter search API invocation for the phrase “kayaking in florida”, capturing the raw JSON API response as it paginates through results and finally wrapping the paged results in a larger collector result object (containing collector metadata). This result object would contain up to N “pages” of tweets from the search for example. The embedded raw results from the Twitter API call would be left as-is, and the overall result object wrapping it would be marshaled to JSON and flushed to the data lake. Collector meta-data about the result would also be stored in a secondary database.

Collection engine

The first logical place we started was to design and build the collection engine itself. Since the implementation would utilize AWS it was decided to implement all functionality of the collector engine using Lambda functions in combination with SNS/SQS so that the crawler engine could be fully reactive, event driven and take advantage of the massive amount of parallel processing needed to efficiently complete a collector engine execution. Each execution could be different each time it ran due to the non-deterministic nature a target collection run could take on any given invocation.

The collector engine itself was originally prototyped in Python as a Serverless Framework project leveraging numerous plugins and could be fully mocked locally on developer machines using serverless offline. From this prototype patterns were identified and much of the implementation was moved into a core Python module that provided much of the overall flow implementation while relying on callers of the module to provide a handful of concrete hook implementations to fulfill the particulars of any given collector.

What was a collector? A collector was simply a Serverless project created for a specific supported endpoint that provided a handful of Lambda functions that would invoke the core Python module's suite of functions who's arguments where instances of the collectors hook implementations that fulfilled the specific work particular to a collector. When support for a new collector was needed, a new Serverless project was scaffolded from a Serverless Project template made specifically for collectors that provided much of the boilerplate while letting collector developers to focus on the few required implementation details to actually pull, process and react to data fetched from the collectors endpoint (i.e. an 3rd party api or website etc)

Here is a high level view of the engine to provide some context for the paragraphs that will follow:

Collector types

There could be dozens of different collector implementations.. an ever growing number. The decision on what kinds of collector's to implement was driven by the organization and was very specific as we were not trying to implement a super generalized crawler in the traditional sense. For example, collectors were generally implemented for the following categories of endpoints that target data could be ingested from.

  • public/private search engines
  • public/private 3rd party APIs
  • public/private social media APIs
  • specific public/private content/news outlets and asset repositories

In addition to dozens of the above, there were a few special collectors developed for specific use cases.

  • generic raw HTTP: this implementation was specifically used for exploration and collecting data from linkages to yet-to-be developed endpoints. This collector could return content and utilized various libraries internally to parse raw response data and make its best attempts to identify relevant text, further linkages and remove any code. Once ingested, automation/analysis could be used to determine relevancy and whether or not certain linkages should be promoted to first class collectors for future monitoring.
  • orchestrator: this implementation was used to “bootstrap” one or more other collectors to begin the crawl for given topic. The payload for the orchestrator would contain an instruction set of child payloads for other collectors that needed to be invoked. The orchestrator invocation would have a special collector id that would be used as the root lineage parent for all other child collectors spawned from it. Orchestrator invocations were run on a scheduled or on-demand basis in reaction to new topics/targets that were identified real-time or by analysts that were worthy of their own collector crawls.

Given the bulk of the workflow logic was implemented in a separate Python module that each Serverless collector project could import; each collector implementation only had to fulfill about three (3) functions. Generally the first to fetch the data and publish an array of named datasets to a result consumer, the second to react to another payload and lastly one to convert results into more structure dataset item records. (more on this below)

Given a Serverless Framework Template was created and used to to instantiate each individual collector’s own Serverless project. All of the individual collector Serverless projects resided within a single monorepo along with the core module and project template itself.

Collectors

Each collector project was specific to a particular supported endpoint. The collector's responsibility when invoked was to make a request as instructed by the payload arguments, parse the response (paging if necessary), write the L1 result documents out to the data lake and publish a collector event. Secondarily each collector was responsible for reacting to collector events received on its own queue as well as well as converting L1 result documents to L2 dataset items. Conversion into L2TS items would occur via additional downstream processes.

Collectors: collect()

The collect() function was the primary entry point to instruct a collector to go and fetch the data it is instructed to do so via a payload that contains arguments that would vary per collector. Each collector's implementation was responsible for providing the implementation for a DataFetcher which would do the specific request logic particular to that collector's supported endpoint as configured by the payloads arguments. When collect() was invoked, the collector would construct an instance of the core module's Collector class and passing the following to it's execute() method:

  • AWSInvocation: bound to the inbound payload and abstracts away the details of invocations from various Lambda invocation methods and their differences (SQS, api-gateway http, lambda direct). Invocation objects yielded the relevant payload + header info in a consistant format without having to worry about the lambda request/context differences.
  • DataFetcher: does the actual request logic and then write to a ResultConsumer
  • ResultConsumer: would write an L1 result document to S3, secondly write metadata to a graph database and lastly publish a CollectorEvent to SNS

The call to Collector.execute() would utilize the above arguments and orchestrate the overall flow as shown in the diagram below:

Collectors: collect(): data sets

Upon invocation of collect(), each collector's DataFetcher.do_fetch() implementation produces an array of one or more named datasets that is passes to the ResultConsumer; a dataset is simply an arbitrarily named object that contains a id, name, tags and metadata with key/values relevant to its type. Most importantly addition to these fields, each dataset contains the actual data that was fetched from some endpoint, and the structure of this varies based on the dataset's "type". We settled on 3 different "types" based on the patterns of access that surfaced across the dozens of different endpoints.

  • SIMPLE: contains a data property that yields the actual data collected and a data_url property. For example this typically contained the raw API response body JSON "as is" or simply a large string of response content. This type was used when a single logical request yielded a single response. (non-paged)
  • PAGED: contains a map of pages data to store paginated results from a single logical collector request. Each page object contained an id, metadata, the page number, and the data element holding the actual data. Each page object was located in the pages map keyed by its individual page id. The metadata field also contained keys to get all pages by ordinal position numbers. This type was used when a single logical request yielded some kind of paged response from the endpoint.
  • KEY_VALUE: contains a map of key_values to store bucketed arbitrarily named result data split up however the collector needs to categorize it. Each key was the value's id. Each value was a special kv object containing an id, key name, metadata map, and the actual data containing the result data for that key. This type was used when a collector needed to arbitrarily categorize and break up a large/single collector response in some custom way.

For all of these, there was a loose “shared knowledge” based contract between the collector’s do_fetch() implementation and the consumer of a given collector's results, i.e. the react() method was often implemented in the same collector, or some other consumer. The collector documentation would explain the contents of each collectors datasets produced so that consumers could then utilize it effectively and generally the same developer who was writing a particular collector was also implementing the other consumption methods as well as working with any other analysts/logic that would ultimately make use of the collector's output downstream.

Collectors: react()

The react() function was a reactive entry point to instruct a collector to go and fetch the persisted L1 result doc from the data lake, analyze it, and then take further action such as invoking additional collector.collect() invocations either on itself or other related collector types; these invocations were over SQS. Each collector's implementation was responsible for providing the implementation for a AggregateEventReactorPlugin which would do the specific logic particular to that collector 's requirements for reacting to other collected results. For example, a reactor might analyze a result, pick out one or more particular identities and then use a secondary API to go fetch the details on that identity; or simply following all links.

One key point with the react() implementation was that without some sort of leash, things could get out of control depending on how a collector was coded, for example it might follow all links, creating more events, and on and on. The way this was controlled was via a reactor config passed in all collect payloads (and global settings) that controlled the max branch depth any reactor/collector could traverse. This was enforced at the core module level that managed the production of all event messages to SQS. This would ensure we could limit the size of a crawl within some estimation for any given target.

When react() was invoked, the collector would construct an instance of the core module's Reactor class and passing the following to it's execute() method:

  • AWSInvocation: see the collect() section above for details
  • AggregateEventReactorPlugin: does the actual processing logic

The call to Reactor.execute() would utilize the above arguments and orchestrate the overall flow as shown in the diagram below:

Collectors: convert()

The convert() function was an additional reactive entry point to instruct a collector to go and fetch the persisted a result in the data lake and convert it to one or more dataset items, each as Parquet and Avro.

Regarding datasets, recall from above that each JSON result document contains N number of datasets each potentially varying by type and each containing one or more sets of actual data. In order to efficiently get at each chunk of data embedded within the datasets, the conversion process would create a separate dataset_item for each data element within a dataset. So for a PAGED dataset with 10 pages, the conversion process would create 10 dataset_items each with full copies of the top level result document attributes and lineage information and additional attributes specific to their individual page. Likewise for a KEY_VALUE dataset with 2 KV pairs, would yield 2 dataset_items. For SIMPLE datasets one dataset_item would be created. Once this was done, now queries could be issued such as "get all dataset_items for search-result-a" which would yield 10 results, each representing the data from one page or "get all dataset_items for search-result-a where page_number = 7". Once converted to dataset_items now all results were effectively queryable by all their attributes and now routable by downstream automation. (note that the L2TS data embedded inside was yet to be processed or accessible in a structured way).

The whole point of converting each JSON result document into both Parquet and Avro formats was two fold.

  • First one of not being able to predict the exact future. The main priority was to get the data into a format that could be a bit more machine queryable, routable and readable as well as more easily usable by human analysts. By having fully expressed in both column and row oriented formats, this gave us more flexibility.
  • Second being that the JSON result docs were intended to be an immutable long term snapshot of the crawled data. In order to be able to properly discover and route it by high level attributes for further processing in an automated way, we needed to give it some more structure, flatten it and explode it into the individual dataset items. Think of dataset items as little cargo boxes with a manifest on them for routing. Inside the boxes was the actual data we cared about, but didn’t quite need to look inside the box just yet… downstream processes would be responsible for that.

To support the convert() function, each collector's implementation was responsible for providing the implementation for a AggregateEventConverterPlugin which would do the specific conversion logic which was primarily implemented in a core module library. When convert() was invoked, the collector would construct an instance of the core module's Converter class and passing the following to it's execute() method:

The call to Converter.execute() would utilize the above arguments and orchestrate the overall flow as shown in the diagram below

Collectors: orchestrator

As mentioned in a previous section, the orchestrator was a special collector implementation was used to "bootstrap" one or more other collectors to begin the crawl for given topic. After a given crawl for a target the graph of results would look something like the below. By organizing target crawls under a single parent, the entire graph of meta-data, and JSON results in the data lake could now be treated as a single package or "snapshot" of an entire crawl that could be processed as a single unit over an over as need be and compared with other "snapshots" to detect drift over time for various data points. Target collections could might be run hourly, daily, weekly etc. depending on the size of the target base.

Data lake

collectors would ultimately need to save their results data JSON documents into a data lake. The storage for this was to be S3, but the "folder" structure needed to be scalable and compatible with big data tooling taking into account partitioning.

We ended up coming up w/ a few different storage path options under target buckets so that collector implementations themselves could dictate the hierarchy their output files would reside in. There were numerous options but ultimately most collectors ended up writing output into the following lake structure, where ROOT_DIR_NAME generally was a S3 bucket specific to a individual collector or a local directory when running in development w/ AWS service mocks.

[ROOT_DIR_NAME]/year=[year]/
month=[month][/day=[day]]/[/hour=[hour]]/
clct_uuid=[clt_uuid]/
clct_version=[version]/
org=[org]/
project=[project]/
result_context_name=[resultContext]/
result_name=[resultName]/[result_file_document].json

Note that clct_uuid represented the top level root orchestrator collector id that would bootstrap an entire crawl for a given target/topic.

Metadata database

Although a large amount of meta-data, lineage information and audit data was recorded within each collector result, some out of band database was needed for the system to track meta-data about collector runs at a high level. This would serve useful for both admins, analysts, developers and downstream automation when processing a collector run. We ended up going with a graph database for this, Neo4j in particular. The graph database was a key element to be able to visually and programmatically traverse the lineage graph for a large set of collector runs. From these high level relationships, more granular relationships that were embedded deep within the data within results where able to have more context.

AWS Glue

As noted previously, when a collector’s convert() method is invoked in response to a CollectorEvent, the full result document is fetched from the S3 data lake and each dataset embedded within it is converted into both Parquet & Avro dataset_item and then written to the data lake.

All dataset_item 's are written to a dedicated location in the lake partitioned as follows:

[CONVERT_ROOT_DIR]/[parquet|avro]/  
compression=[compression type]/
dataset_items/
p_schema_version=[version]/
p_environment=[env]/
p_producer_name=[collector name]/
p_year=[yyyy]/
p_month=[mm]/
p_day=[dd]/
p_hour=[hh]/collector_[env]_dataset_items.[parquet|avro]

Structuring it this way permits an AWS Glue catalog to be created for the target bucket and then subsequently crawled after any new collection execution or on an otherwise scheduled basis. Once the data is catalogued it can then be queried using Athena or Spark jobs against via the partitions and any top level dataset item attributes. It can exported in part as well as other higher level automation run against it.

High level view

At this point you should have a pretty good feel for how the data collection side of this system worked. After each collector executed for a given target, we had a full set of JSON result documents, queryable graph of related metadata about the crawl, a far more structured/queryable version of the data contained in all the individual dataset_items, and finally all of this was catalogued in AWS Glue's catalog. Now things are bit more friendly to process and sort through at a high level vs going directly to millions of big JSON files.

Why even have the big JSON result documents to begin with? Well by storing in this format, we ensure we have a permanent snapshot for a point in time of an entire target’s “graph of existence” within the scope of the crawled endpoints. Given JSON is fairly ubiquitous, if at some point in the future we need to re-process the data or change the format of dataset_items we can just go back to the JSON and re-process from there.

Great, wow that was a lot of work… but we haven’t even started to look at the meaningful data contained within each one of the dataset_items yet! To get to that point we have more challenges on our hands, which you can read about in part 2 of this series which covers the Layer 2 Type Specific Items (L2TS).

To be continued

The next big part of this architecture was actually digging into the data contained within each dataset_item. For more. on this please continue on to part 2 of this series. Thanks for reading.

Originally published at http://bitsofinfo.wordpress.com on January 23, 2022.

--

--