Data Import

Data import loads your data from some external representation into an iterable, internal representation for Topik. The main front end for importing data is the read_input() function:

>>> from topik import read_input
>>> corpus = read_input(source="data_file.json", content_field="text")

read_input() is a front-end to several potential reader backends. Presently, read_input() attempts to recognize which backend to use based on some characteristics of the source string you pass in. These criteria are:

  • ends with .js or .json: treat as JSON stream filename first, fall back to “large JSON” (such as file generated by esdump).
  • contains 8983: treat as solr connection address (8983 is the default solr port).
  • contains 9200: treat as Elasticsearch connection address (9200 is the default Elasticsearch port).
  • result of os.path.splitext(source)[1] is “”: treat as folder of files. Each file is considered raw text, and its contents are stored under the key given by content_field. Files may be gzipped.

Any of the backends can also be forced by passing the source_type argument with one of the following string arguments:

  • solr
  • elastic
  • json_stream
  • large_json
  • folder

The content_field is a mandatory argument that in most cases specifies where the actual content to be analyzed will be drawn from. For all hierarchical data sources (everything except folders), this accesses some subfield of the data you feed in.

JSON additional options

For JSON stream and “large JSON” inputs, an additional keyword argument may be passed, json_prefix, which is the period-separated path leading to the single content field. This is for content fields not at the root level of the JSON document. For example, given the JSON content:

[ {"nested": {"dictionary": {"text": "I am the text you're looking for."} } } ]

You would read using the following json_prefix argument:

>>> corpus = read_input(source="data_file.json", content_field="text",
                        json_prefix="nested.dictionary")

Elasticsearch additional options and notes

The Elasticsearch importer expects a full string specifying the Elasticsearch server. This string at a minimum must contain both the server address and the index to access (if any). All results returned from the Elasticsearch query contain only the contents of the ‘_source’ field returned from the query.

>>> corpus = read_input(source="https://localhost:9200", index="test_index", content_field="text")

Extra arguments passed by keyword are passed to the Elasticsearch instance creation. This can be used to pass additional login parameters, for example, to use SSL:

>>> corpus = read_input(source="https://user:secret@localhost:9200",
                        index="test_index", content_field="text", use_ssl=True)

The source argument for Elasticsearch also supports multiple servers, though this requires that you manually specify the ‘elastic’ source_type:

>>> corpus = read_input(source=["https://server1", "https://server2"],
                        index="test_index", source_type="elastic", content_field="text")

For more information on server options, please refer to Elasticsearch’s documentation.

Extra keyword arguments are also passed to the scroll helper that returns results. Of special note here, an additional query keyword argument can be passed to limit the records imported from the server. This query must follow the Elasticsearch query DSL. For more information on Elasticsearch query DSL, please refer to Elasticsearch’s DSL docs.

>>> query = "{"filtered": {"query": {"match": { "tweet": "full text search"}}}}"
>>> corpus = read_input(source="https://localhost:9200", index="test_index",
                        content_field="tweet", query=query)

Output formats

Output formats are how your data are represented to further processing and modeling. To ensure a uniform interface, output formats implement the interface described by CorpusInterface. Presently, two such backends are implemented: DictionaryCorpus and ElasticSearchCorpus. Available outputs can be examined by checking the keys of the registered_outputs dictionary:

>>> from topik import registered_outputs
>>> list(registered_outputs.keys())

The default output is the DictionaryCorpus. No additional arguments are necessary. DictionaryCorpus stores everything in a Python dictionary. As such, it is memory intensive. All operations done with a DictionaryCorpus block until complete. DictionaryCorpus is the simplest to use, but it will ultimately limit the size of analyses that you can perform.

The ElasticSearchCorpus can be specified to read_input() using the output_type argument. It must be accompanied by another keyword argument, output_args, which should be a dictionary containing connection details and any additional arguments.

>>> output_args = {"source": "localhost", "index": "destination_index"}
>>> raw_data = read_input("test_data.json", output_type='elastic',
                          output_args=output_args, content_field="text")

ElasticSearchCorpus stores everything in an Elasticsearch instance that you specify. Operations do not block, and have “eventual consistency”: the corpus will eventually have all of the documents you sent available, but not necessarily immediately after the read_input function returns. This lag time is due to Elasticsearch indexing the data on the server side.

Synchronous wait

As mentioned above, some output formats are not immediately ready for consumption after loading data. For example, after sending data to Elasticsearch, Elasticsearch will take some time to index that data. Until that indexing is complete, that data will not show up in iterations over the corpus. To force your program to wait for this to finish, use the synchronous_wait argument to read_input:

>>> output_args = {"source": "localhost", "index": "destination_index"}
>>> raw_data = read_input("test_data.json", output_type='elastic',
                          output_args=output_args, content_field="text",
                          synchronous_wait=30)

This example will wait up to 30 seconds for the Elasticsearch indexing to stabilize. This is evaluated as the point at which the number of documents in the output has not changed after 1 second. If the number of documents has not stabilized after the synchronous wait period, you will get a warning message, but execution will proceed.

This is a property only of output formats. Input has no wait associated with it, because the source is assumed to be “complete” when you ask for it. Please make sure that this is true, or your results will be ill-defined and impossible to reproduce.

Saving and loading corpora

The output object of any read_input() step is saveable and loadable. This allows you to quickly get back to any filtered state you may have applied to some larger corpus, and also ensures that the corpus you load with a model is consistent with the corpus that was used to create that model. To save a corpus, call its save() method:

>>> raw_data.save("output_filename")

The file format of the saved file is JSON. Depending on the exact class that your corpus is, more or less data may be saved to this JSON file. For example, the DictionaryCorpus class saves all of its corpus data to this JSON file, and can be quite large. The ElasticsearchCorpus class saves only connection details and filtering metadata to this JSON file, and is much smaller.

Loading corpora is achieved using the load_persisted_corpus() function. This function returns the appropriate Corpus object, based on metadata in the JSON file.

>>> from topik.intermediaries.raw_data import load_persisted_corpus >>>
raw_data = load_persisted_corpus("output_filename")