"""
This file is concerned with providing a simple interface for data stored in
Elasticsearch. The class(es) defined here are fed into the preprocessing step.
"""
from abc import ABCMeta, abstractmethod, abstractproperty
import logging
import time
from six import with_metaclass
from topik.intermediaries.persistence import Persistor
from topik.tokenizers import tokenizer_methods
from topik.intermediaries.digested_document_collection import DigestedDocumentCollection
registered_outputs = {}
[docs]def register_output(cls):
global registered_outputs
if cls.class_key() not in registered_outputs:
registered_outputs[cls.class_key()] = cls
return cls
def _get_hash_identifier(input_data, id_field):
return hash(input_data[id_field])
def _get_parameters_string(**kwargs):
"""Used to create identifiers for output"""
id = ''.join('{}={}_'.format(key, val) for key, val in sorted(kwargs.items()))
return id[:-1]
[docs]class CorpusInterface(with_metaclass(ABCMeta)):
def __init__(self):
super(CorpusInterface, self).__init__()
self.persistor = Persistor()
@classmethod
@abstractmethod
[docs] def class_key(cls):
"""Implement this method to return the string ID with which to store your class."""
raise NotImplementedError
@abstractmethod
def __iter__(self):
"""This is expected to iterate over your data, returning tuples of (doc_id, <selected field>)"""
raise NotImplementedError
@abstractmethod
def __len__(self):
raise NotImplementedError
@abstractmethod
[docs] def get_generator_without_id(self, field=None):
"""Returns a generator that yields field content without doc_id associate"""
raise NotImplementedError
@abstractmethod
[docs] def append_to_record(self, record_id, field_name, field_value):
"""Used to store preprocessed output alongside input data.
Field name is destination. Value is processed value."""
raise NotImplementedError
@abstractmethod
[docs] def get_date_filtered_data(self, start, end, field):
raise NotImplementedError
@abstractproperty
def filter_string(self):
raise NotImplementedError
[docs] def save(self, filename, saved_data=None):
"""Persist this object to disk somehow.
You can save your data in any number of files in any format, but at a minimum, you need one json file that
describes enough to bootstrap the loading prcess. Namely, you must have a key called 'class' so that upon
loading the output, the correct class can be instantiated and used to load any other data. You don't have
to implement anything for saved_data, but it is stored as a key next to 'class'.
"""
self.persistor.store_corpus({"class": self.__class__.class_key(), "saved_data": saved_data})
self.persistor.persist_data(filename)
[docs] def synchronize(self, max_wait, field):
"""By default, operations are synchronous and no additional wait is
necessary. Data sources that are asynchronous (ElasticSearch) may
use this function to wait for "eventual consistency" """
pass
[docs] def tokenize(self, method="simple", synchronous_wait=30, **kwargs):
"""Convert data to lowercase; tokenize; create bag of words collection.
Output from this function is used as input to modeling steps.
raw_data: iterable corpus object containing the text to be processed.
Each iteration call should return a new document's content.
tokenizer_method: string id of tokenizer to use. For keys, see
topik.tokenizers.tokenizer_methods (which is a dictionary of classes)
kwargs: arbitrary dicionary of extra parameters. These are passed both
to the tokenizer and to the vectorizer steps.
"""
parameters_string = _get_parameters_string(method=method, **kwargs)
token_path = "tokens_"+parameters_string
for record_id, raw_record in self:
tokenized_record = tokenizer_methods[method](raw_record,
**kwargs)
# TODO: would be nice to aggregate batches and append in bulk
self.append_to_record(record_id, token_path, tokenized_record)
self.synchronize(max_wait=synchronous_wait, field=token_path)
return DigestedDocumentCollection(self.get_field(field=token_path))
@register_output
[docs]class ElasticSearchCorpus(CorpusInterface):
def __init__(self, source, index, content_field, doc_type=None, query=None, iterable=None,
filter_expression="", **kwargs):
from elasticsearch import Elasticsearch
super(ElasticSearchCorpus, self).__init__()
self.hosts = source
self.instance = Elasticsearch(hosts=source, **kwargs)
self.index = index
self.content_field = content_field
self.doc_type = doc_type
self.query = query
if iterable:
self.import_from_iterable(iterable, content_field)
self.filter_expression = filter_expression
@classmethod
[docs] def class_key(cls):
return "elastic"
@property
def filter_string(self):
return self.filter_expression
def __iter__(self):
from elasticsearch import helpers
results = helpers.scan(self.instance, index=self.index,
query=self.query, doc_type=self.doc_type)
for result in results:
yield result["_id"], result['_source'][self.content_field]
def __len__(self):
return self.instance.count(index=self.index, doc_type=self.doc_type)["count"]
[docs] def get_generator_without_id(self, field=None):
if not field:
field = self.content_field
for (_, result) in ElasticSearchCorpus(self.hosts, self.index, field, self.doc_type, self.query):
yield result
[docs] def append_to_record(self, record_id, field_name, field_value):
self.instance.update(index=self.index, id=record_id, doc_type="continuum",
body={"doc": {field_name: field_value}})
[docs] def get_field(self, field=None):
"""Get a different field to iterate over, keeping all other
connection details."""
if not field:
field = self.content_field
return ElasticSearchCorpus(self.hosts, self.index, field, self.doc_type, self.query)
[docs] def import_from_iterable(self, iterable, id_field="text", batch_size=500):
"""Load data into Elasticsearch from iterable.
iterable: generally a list of dicts, but possibly a list of strings
This is your data. Your dictionary structure defines the schema
of the elasticsearch index.
id_field: string identifier of field to hash for content ID. For
list of dicts, a valid key value in the dictionary is required. For
list of strings, a dictionary with one key, "text" is created and
used.
"""
from elasticsearch import helpers
batch = []
for item in iterable:
if isinstance(item, basestring):
item = {id_field: item}
id = _get_hash_identifier(item, id_field)
action = {'_op_type': 'update',
'_index': self.index,
'_type': 'continuum',
'_id': id,
'doc': item,
'doc_as_upsert': "true",
}
batch.append(action)
if len(batch) >= batch_size:
helpers.bulk(client=self.instance, actions=batch, index=self.index)
batch = []
if batch:
helpers.bulk(client=self.instance, actions=batch, index=self.index)
self.instance.indices.refresh(self.index)
[docs] def convert_date_field_and_reindex(self, field):
index = self.index
if self.instance.indices.get_field_mapping(field=field,
index=index,
doc_type="continuum") != 'date':
index = self.index+"_{}_alias_date".format(field)
if not self.instance.indices.exists(index) or self.instance.indices.get_field_mapping(field=field,
index=index,
doc_type="continuum") != 'date':
mapping = self.instance.indices.get_mapping(index=self.index,
doc_type="continuum")
mapping[self.index]["mappings"]["continuum"]["properties"][field] = {"type": "date"}
self.instance.indices.put_alias(index=self.index,
name=index,
body=mapping)
self.instance.indices.refresh(index)
while self.instance.count(index=self.index) != self.instance.count(index=index):
logging.info("Waiting for date indexed data to be indexed...")
time.sleep(1)
return index
# TODO: validate input data to ensure that it has valid year data
[docs] def get_date_filtered_data(self, start, end, field="date"):
converted_index = self.convert_date_field_and_reindex(field=field)
return ElasticSearchCorpus(self.hosts, converted_index, self.content_field, self.doc_type,
query={"query": {"filtered": {"filter": {"range": {field: {"gte": start,
"lte": end}}}}}},
filter_expression=self.filter_expression + "_date_{}_{}".format(start, end))
[docs] def save(self, filename, saved_data=None):
if saved_data is None:
saved_data = {"source": self.hosts, "index": self.index, "content_field": self.content_field,
"doc_type": self.doc_type, "query": self.query}
return super(ElasticSearchCorpus, self).save(filename, saved_data)
[docs] def synchronize(self, max_wait, field):
# TODO: change this to a more general condition for wider use, including read_input
# could just pass in a string condition and then 'while not eval(condition)'
count_not_yet_updated = -1
while count_not_yet_updated != 0:
count_not_yet_updated = self.instance.count(index=self.index,
doc_type=self.doc_type,
body={"query": {
"constant_score" : {
"filter" : {
"missing" : {
"field" : field}}}}})['count']
logging.debug("Count not yet updated: {}".format(count_not_yet_updated))
time.sleep(0.01)
pass
@register_output
[docs]class DictionaryCorpus(CorpusInterface):
def __init__(self, content_field, iterable=None, generate_id=True, reference_field=None, content_filter=None):
super(DictionaryCorpus, self).__init__()
self.content_field = content_field
self._documents = []
self.idx = 0
active_field = None
if reference_field:
self.reference_field = reference_field
active_field = content_field
content_field = reference_field
else:
self.reference_field = content_field
if iterable:
self.import_from_iterable(iterable, content_field, generate_id)
if active_field:
self.content_field = active_field
self.content_filter = content_filter
@classmethod
[docs] def class_key(cls):
return "dictionary"
def __iter__(self):
for doc in self._documents:
if self.content_filter:
if eval(self.content_filter["expression"].format(doc["_source"][self.content_filter["field"]])):
yield doc["_id"], doc["_source"][self.content_field]
else:
yield doc["_id"], doc["_source"][self.content_field]
def __len__(self):
return len(self._documents)
@property
def filter_string(self):
return self.content_filter["expression"].format(self.content_filter["field"]) if self.content_filter else ""
[docs] def append_to_record(self, record_id, field_name, field_value):
for doc in self._documents:
if doc["_id"] == record_id:
doc["_source"][field_name] = field_value
return
raise ValueError("No record with id '{}' was found.".format(record_id))
[docs] def get_field(self, field=None):
"""Get a different field to iterate over, keeping all other details."""
if not field:
field = self.content_field
return DictionaryCorpus(content_field=field, iterable=self._documents,
generate_id=False, reference_field=self.content_field)
[docs] def get_generator_without_id(self, field=None):
if not field:
field = self.content_field
for doc in self._documents:
yield doc["_source"][field]
[docs] def import_from_iterable(self, iterable, content_field, generate_id=True):
"""
iterable: generally a list of dicts, but possibly a list of strings
This is your data. Your dictionary structure defines the schema
of the elasticsearch index.
"""
if generate_id:
self._documents = [{"_id": hash(doc[content_field]),
"_source": doc} for doc in iterable]
self.reference_field = content_field
else:
self._documents = [item for item in iterable]
# TODO: generalize for datetimes
# TODO: validate input data to ensure that it has valid year data
[docs] def get_date_filtered_data(self, start, end, field="year"):
return DictionaryCorpus(content_field=field, iterable=self._documents,
generate_id=False, reference_field=self.content_field,
content_filter={"field": field, "expression": "{}<=int({})<={}".format(start, "{}", end)})
[docs] def save(self, filename, saved_data=None):
if saved_data is None:
saved_data = {"reference_field": self.reference_field, "content_field": self.content_field,
"iterable": [doc["_source"] for doc in self._documents]}
return super(DictionaryCorpus, self).save(filename, saved_data)
[docs]def load_persisted_corpus(filename):
corpus_dict = Persistor(filename).get_corpus_dict()
return registered_outputs[corpus_dict['class']](**corpus_dict["saved_data"])