Source code for topik.fileio.out_elastic

from six.moves import UserDict
import logging
import time

from elasticsearch import Elasticsearch, helpers

from ._registry import register_output
from .base_output import OutputInterface
from topik.vectorizers.vectorizer_output import VectorizerOutput
from topik.models.base_model_output import ModelOutput

[docs]def es_setitem(key, value, doc_type, instance, index, batch_size=1000): """load an iterable of (id, value) pairs to the specified new or new or existing field within existing documents.""" batch = [] for id, val in value: action = {'_op_type': 'update', '_index': index, '_type': doc_type, '_id': id, 'doc': {key: val}, 'doc_as_upsert': "true", } batch.append(action) if len(batch) >= batch_size: helpers.bulk(client=instance, actions=batch, index=index) batch = [] if batch: helpers.bulk(client=instance, actions=batch, index=index) instance.indices.refresh(index)
[docs]def es_getitem(key, doc_type, instance, index, query=None): results = helpers.scan(instance, index=index, query=query, doc_type=doc_type) for result in results: try: id = int(result["_id"]) except ValueError: id = result["_id"] yield id, result['_source'][key]
[docs]class BaseElasticCorpora(UserDict): def __init__(self, instance, index, corpus_type, query=None, batch_size=1000): self.instance = instance self.index = index self.corpus_type = corpus_type self.query = query self.batch_size = batch_size pass def __setitem__(self, key, value): es_setitem(key, value, self.corpus_type, self.instance, self.index) def __getitem__(self, key): return es_getitem(key,self.corpus_type,self.instance,self.index, self.query)
[docs]class VectorizedElasticCorpora(BaseElasticCorpora): def __setitem__(self, key, value): #id_term_map es_setitem(key,value.id_term_map.items(),"term",self.instance,self.index) #document_term_counts es_setitem(key,value.document_term_counts.items(),"document_term_count",self.instance,self.index) #doc_lengths es_setitem(key,value.doc_lengths.items(),"document_length",self.instance,self.index) #global term_frequency es_setitem(key,value.term_frequency.items(),"term_frequency",self.instance,self.index) #vectors es_setitem(key,value.vectors.items(),"vector",self.instance,self.index) # could either upload vectors explicitly here (above) or using Super (below) #super(VectorizedElasticCorpora, self).__setitem__(key, value) def __getitem__(self, key): # TODO: each of these should be retrieved from a query. Populate the VectorizerOutput object # and return it. These things can be iterators instead of dicts; VectorizerOutput should # not care. # TODO: this is the id->term map for the full set of unique terms across all docs id_term_map = {int(term_id): term for term_id, term in es_getitem(key,"term",self.instance,self.index,self.query)} # 15 # TODO: this is the count of terms associated with each document document_term_count = {int(doc_id): doc_term_count for doc_id, doc_term_count in es_getitem(key,"document_term_count",self.instance,self.index,self.query)} # {"doc1": 3, "doc2": 5} doc_lengths = {int(doc_id): doc_length for doc_id, doc_length in es_getitem(key,"document_length",self.instance,self.index,self.query)} term_frequency = {int(term_id): global_frequency for term_id, global_frequency in es_getitem(key,"term_frequency",self.instance,self.index,self.query)} # TODO: this is the vectorized representation of each document vectors = {int(doc_id): {int(term_id): term_weight for term_id, term_weight in doc_term_weights.items()} for doc_id, doc_term_weights in es_getitem(key,"vector",self.instance,self.index,self.query)} #vectors = {int(doc_id): {doc_term_weights for doc_id, doc_term_weights in es_getitem(key,"vector",self.instance,self.index,self.query)} #vectors = list(es_getitem(key,"vector",self.instance,self.index,self.query)) # {"doc1": {1: 3, 2: 1} # word id is key, word count is value (for bag of words model) return VectorizerOutput(id_term_map=id_term_map, document_term_counts=document_term_count, doc_lengths=doc_lengths, term_frequency=term_frequency, vectors=vectors)
[docs]class ModeledElasticCorpora(BaseElasticCorpora): def __setitem__(self, key, value): es_setitem(key,value.vocab.items(),"term",self.instance,self.index) es_setitem(key,value.term_frequency.items(),"term_frequency",self.instance,self.index) es_setitem(key,value.topic_term_matrix.items(),"topic_term_dist",self.instance,self.index) es_setitem(key,value.doc_lengths.items(),"doc_length",self.instance,self.index) es_setitem(key,value.doc_topic_matrix.items(),"doc_topic_dist",self.instance,self.index) def __lt__(self, y): return super(ModeledElasticCorpora, self).__lt__(y) def __getitem__(self, key): vocab = {int(term_id): term for term_id, term in \ es_getitem(key,"term",self.instance,self.index,self.query)} term_frequency = {int(term_id): tf for term_id, tf in \ es_getitem(key,"term_frequency",self.instance,self.index,self.query)} topic_term_matrix = {topic_id: topic_term_dist for topic_id, topic_term_dist in \ es_getitem(key,"topic_term_dist",self.instance,self.index,self.query)} doc_lengths = {topic_id: doc_length for topic_id, doc_length in \ es_getitem(key,"doc_length",self.instance,self.index,self.query)} doc_topic_matrix = {int(doc_id): doc_topic_dist for doc_id, doc_topic_dist in \ es_getitem(key,"doc_topic_dist",self.instance,self.index,self.query)} return ModelOutput(vocab=vocab, term_frequency=term_frequency, topic_term_matrix=topic_term_matrix, doc_lengths=doc_lengths, doc_topic_matrix=doc_topic_matrix)
@register_output
[docs]class ElasticSearchOutput(OutputInterface): def __init__(self, source, index, hash_field=None, doc_type='continuum', query=None, iterable=None, filter_expression="", vectorized_corpora=None, tokenized_corpora=None, modeled_corpora=None, **kwargs): super(ElasticSearchOutput, self).__init__() self.hosts = source self.instance = Elasticsearch(hosts=source, **kwargs) self.index = index self.doc_type = doc_type self.query = query self.hash_field = hash_field if iterable: self.import_from_iterable(iterable, hash_field) self.filter_expression = filter_expression self.tokenized_corpora = tokenized_corpora if tokenized_corpora else \ BaseElasticCorpora(self.instance, self.index, 'tokenized', self.query) self.vectorized_corpora = vectorized_corpora if vectorized_corpora else \ VectorizedElasticCorpora(self.instance, self.index, 'vectorized', self.query) self.modeled_corpora = modeled_corpora if modeled_corpora else \ ModeledElasticCorpora(self.instance, self.index, "models", self.query) @property def filter_string(self): return self.filter_expression
[docs] def import_from_iterable(self, iterable, field_to_hash='text', batch_size=500): """Load data into Elasticsearch from iterable. iterable: generally a list of dicts, but possibly a list of strings This is your data. Your dictionary structure defines the schema of the elasticsearch index. field_to_hash: string identifier of field to hash for content ID. For list of dicts, a valid key value in the dictionary is required. For list of strings, a dictionary with one key, "text" is created and used. """ if field_to_hash: self.hash_field = field_to_hash batch = [] for item in iterable: if isinstance(item, basestring): item = {field_to_hash: item} id = hash(item[field_to_hash]) action = {'_op_type': 'update', '_index': self.index, '_type': self.doc_type, '_id': id, 'doc': item, 'doc_as_upsert': "true", } batch.append(action) if len(batch) >= batch_size: helpers.bulk(client=self.instance, actions=batch, index=self.index) batch = [] if batch: helpers.bulk(client=self.instance, actions=batch, index=self.index) self.instance.indices.refresh(self.index) else: raise ValueError("A field_to_hash is required for import_from_iterable")
[docs] def convert_date_field_and_reindex(self, field): index = self.index if self.instance.indices.get_field_mapping(fields=[field], index=index, doc_type=self.doc_type) != 'date': index = self.index+"_{}_alias_date".format(field) if not self.instance.indices.exists(index) or self.instance.indices.get_field_mapping(field=field, index=index, doc_type=self.doc_type) != 'date': mapping = self.instance.indices.get_mapping(index=self.index, doc_type=self.doc_type) mapping[self.index]["mappings"][self.doc_type]["properties"][field] = {"type": "date"} self.instance.indices.put_alias(index=self.index, name=index, body=mapping) self.instance.indices.refresh(index) while self.instance.count(index=self.index) != self.instance.count(index=index): logging.info("Waiting for date indexed data to be indexed...") time.sleep(1) return index
# TODO: validate input data to ensure that it has valid year data
[docs] def get_date_filtered_data(self, field_to_get, start, end, filter_field="date"): converted_index = self.convert_date_field_and_reindex(field=filter_field) results = helpers.scan(self.instance, index=converted_index, doc_type=self.doc_type, query={ "query": {"filtered": {"filter": {"range": {filter_field: { "gte": start,"lte": end}}}}}}) for result in results: yield result["_id"], result['_source'][field_to_get]
[docs] def get_filtered_data(self, field_to_get, filter=""): results = helpers.scan(self.instance, index=self.index, query=self.query, doc_type=self.doc_type) for result in results: yield result["_id"], result['_source'][field_to_get]
[docs] def save(self, filename, saved_data=None): if saved_data is None: saved_data = {"source": self.hosts, "index": self.index, "hash_field": self.hash_field, "doc_type": self.doc_type, "query": self.query} return super(ElasticSearchOutput, self).save(filename, saved_data)
[docs] def synchronize(self, max_wait, field): # TODO: change this to a more general condition for wider use, including read_input # could just pass in a string condition and then 'while not eval(condition)' count_not_yet_updated = -1 while count_not_yet_updated != 0: count_not_yet_updated = self.instance.count(index=self.index, doc_type=self.doc_type, body={"query": { "constant_score" : { "filter" : { "missing" : { "field" : field}}}}})['count'] logging.debug("Count not yet updated: {}".format(count_not_yet_updated)) time.sleep(0.01) pass