Source code for octis.models.CTM

from sklearn.feature_extraction.text import CountVectorizer

from octis.models.model import AbstractModel
from octis.models.contextualized_topic_models.datasets import dataset
from octis.models.contextualized_topic_models.models import ctm
from octis.models.contextualized_topic_models.utils.data_preparation import (
    bert_embeddings_from_list)

import os
import pickle as pkl
import torch
import numpy as np
import random


[docs]class CTM(AbstractModel): def __init__( self, num_topics=10, model_type='prodLDA', activation='softplus', dropout=0.2, learn_priors=True, batch_size=64, lr=2e-3, momentum=0.99, solver='adam', num_epochs=100, reduce_on_plateau=False, prior_mean=0.0, prior_variance=None, num_layers=2, num_neurons=100, seed=None, use_partitions=True, num_samples=10, inference_type="zeroshot", bert_path="", bert_model="bert-base-nli-mean-tokens"): """ initialization of CTM :param num_topics : int, number of topic components, (default 10) :param model_type : string, 'prodLDA' or 'LDA' (default 'prodLDA') :param activation : string, 'softplus', 'relu', 'sigmoid', 'swish', 'tanh', 'leakyrelu', 'rrelu', 'elu', 'selu' (default 'softplus') :param num_layers : int, number of layers (default 2) :param dropout : float, dropout to use (default 0.2) :param learn_priors : bool, make priors a learnable parameter (default True) :param batch_size : int, size of batch to use for training (default 64) :param lr : float, learning rate to use for training (default 2e-3) :param momentum : float, momentum to use for training (default 0.99) :param solver : string, optimizer 'adam' or 'sgd' (default 'adam') :param num_epochs : int, number of epochs to train for, (default 100) :param num_samples: int, number of times theta needs to be sampled (default: 10) :param seed : int, the random seed. Not used if None (default None). :param use_partitions: bool, if true the model will be trained on the training set and evaluated on the test set (default: true) :param reduce_on_plateau : bool, reduce learning rate by 10x on plateau of 10 epochs (default False) :param inference_type: the type of the CTM model. It can be "zeroshot" or "combined" (default zeroshot) :param bert_path: path to store the document contextualized representations :param bert_model: name of the contextualized model (default: bert-base-nli-mean-tokens). see https://www.sbert.net/docs/pretrained_models.html """ super().__init__() self.hyperparameters['num_topics'] = num_topics self.hyperparameters['model_type'] = model_type self.hyperparameters['activation'] = activation self.hyperparameters['dropout'] = dropout self.hyperparameters['inference_type'] = inference_type self.hyperparameters['learn_priors'] = learn_priors self.hyperparameters['batch_size'] = batch_size self.hyperparameters['lr'] = lr self.hyperparameters['num_samples'] = num_samples self.hyperparameters['momentum'] = momentum self.hyperparameters['solver'] = solver self.hyperparameters['num_epochs'] = num_epochs self.hyperparameters['reduce_on_plateau'] = reduce_on_plateau self.hyperparameters["prior_mean"] = prior_mean self.hyperparameters["prior_variance"] = prior_variance self.hyperparameters["num_neurons"] = num_neurons self.hyperparameters["bert_path"] = bert_path self.hyperparameters["num_layers"] = num_layers self.hyperparameters["bert_model"] = bert_model self.hyperparameters["seed"] = seed self.use_partitions = use_partitions hidden_sizes = tuple([num_neurons for _ in range(num_layers)]) self.hyperparameters['hidden_sizes'] = tuple(hidden_sizes) self.model = None self.vocab = None
[docs] def train_model(self, dataset, hyperparameters=None, top_words=10): """ trains CTM model :param dataset: octis Dataset for training the model :param hyperparameters: dict, with optionally) the following information: :param top_words: number of top-n words of the topics (default 10) """ if hyperparameters is None: hyperparameters = {} self.set_params(hyperparameters) self.vocab = dataset.get_vocabulary() self.set_seed(seed=self.hyperparameters['seed']) if self.use_partitions: train, validation, test = dataset.get_partitioned_corpus( use_validation=True) data_corpus_train = [' '.join(i) for i in train] data_corpus_test = [' '.join(i) for i in test] data_corpus_validation = [' '.join(i) for i in validation] x_train, x_test, x_valid, input_size = self.preprocess( self.vocab, data_corpus_train, test=data_corpus_test, validation=data_corpus_validation, bert_train_path=( self.hyperparameters['bert_path'] + "_train.pkl"), bert_test_path=self.hyperparameters['bert_path'] + "_test.pkl", bert_val_path=self.hyperparameters['bert_path'] + "_val.pkl", bert_model=self.hyperparameters["bert_model"]) self.model = ctm.CTM( input_size=input_size, bert_input_size=x_train.X_bert.shape[1], model_type='prodLDA', num_topics=self.hyperparameters['num_topics'], dropout=self.hyperparameters['dropout'], activation=self.hyperparameters['activation'], lr=self.hyperparameters['lr'], inference_type=self.hyperparameters['inference_type'], hidden_sizes=self.hyperparameters['hidden_sizes'], solver=self.hyperparameters['solver'], momentum=self.hyperparameters['momentum'], num_epochs=self.hyperparameters['num_epochs'], learn_priors=self.hyperparameters['learn_priors'], batch_size=self.hyperparameters['batch_size'], num_samples=self.hyperparameters['num_samples'], topic_prior_mean=self.hyperparameters["prior_mean"], reduce_on_plateau=self.hyperparameters['reduce_on_plateau'], topic_prior_variance=self.hyperparameters["prior_variance"], top_words=top_words) self.model.fit(x_train, x_valid, verbose=False) result = self.inference(x_test) return result else: data_corpus = [' '.join(i) for i in dataset.get_corpus()] x_train, input_size = self.preprocess( self.vocab, train=data_corpus, bert_train_path=( self.hyperparameters['bert_path'] + "_train.pkl"), bert_model=self.hyperparameters["bert_model"]) self.model = ctm.CTM( input_size=input_size, bert_input_size=x_train.X_bert.shape[1], model_type='prodLDA', num_topics=self.hyperparameters['num_topics'], dropout=self.hyperparameters['dropout'], activation=self.hyperparameters['activation'], lr=self.hyperparameters['lr'], inference_type=self.hyperparameters['inference_type'], hidden_sizes=self.hyperparameters['hidden_sizes'], solver=self.hyperparameters['solver'], momentum=self.hyperparameters['momentum'], num_epochs=self.hyperparameters['num_epochs'], learn_priors=self.hyperparameters['learn_priors'], batch_size=self.hyperparameters['batch_size'], num_samples=self.hyperparameters['num_samples'], topic_prior_mean=self.hyperparameters["prior_mean"], reduce_on_plateau=self.hyperparameters['reduce_on_plateau'], topic_prior_variance=self.hyperparameters["prior_variance"], top_words=top_words) self.model.fit(x_train, None, verbose=False) result = self.model.get_info() return result
def set_params(self, hyperparameters): for k in hyperparameters.keys(): if k in self.hyperparameters.keys() and k != 'hidden_sizes': self.hyperparameters[k] = hyperparameters.get( k, self.hyperparameters[k]) self.hyperparameters['hidden_sizes'] = tuple( [self.hyperparameters["num_neurons"] for _ in range( self.hyperparameters["num_layers"])]) def inference(self, x_test): assert isinstance(self.use_partitions, bool) and self.use_partitions results = self.model.predict(x_test) return results def partitioning(self, use_partitions=False): self.use_partitions = use_partitions @staticmethod def set_seed(seed=None): if seed is not None: torch.manual_seed(seed) torch.cuda.manual_seed(seed) np.random.seed(seed) random.seed(seed) torch.backends.cudnn.enabled = False torch.backends.cudnn.deterministic = True @staticmethod def preprocess( vocab, train, bert_model, test=None, validation=None, bert_train_path=None, bert_test_path=None, bert_val_path=None): vocab2id = {w: i for i, w in enumerate(vocab)} vec = CountVectorizer( vocabulary=vocab2id, token_pattern=r'(?u)\b[\w+|\-]+\b') entire_dataset = train.copy() if test is not None: entire_dataset.extend(test) if validation is not None: entire_dataset.extend(validation) vec.fit(entire_dataset) idx2token = {v: k for (k, v) in vec.vocabulary_.items()} x_train = vec.transform(train) b_train = CTM.load_bert_data(bert_train_path, train, bert_model) train_data = dataset.CTMDataset(x_train.toarray(), b_train, idx2token) input_size = len(idx2token.keys()) if test is not None and validation is not None: x_test = vec.transform(test) b_test = CTM.load_bert_data(bert_test_path, test, bert_model) test_data = dataset.CTMDataset(x_test.toarray(), b_test, idx2token) x_valid = vec.transform(validation) b_val = CTM.load_bert_data(bert_val_path, validation, bert_model) valid_data = dataset.CTMDataset( x_valid.toarray(), b_val, idx2token) return train_data, test_data, valid_data, input_size if test is None and validation is not None: x_valid = vec.transform(validation) b_val = CTM.load_bert_data(bert_val_path, validation, bert_model) valid_data = dataset.CTMDataset( x_valid.toarray(), b_val, idx2token) return train_data, valid_data, input_size if test is not None and validation is None: x_test = vec.transform(test) b_test = CTM.load_bert_data(bert_test_path, test, bert_model) test_data = dataset.CTMDataset(x_test.toarray(), b_test, idx2token) return train_data, test_data, input_size if test is None and validation is None: return train_data, input_size @staticmethod def load_bert_data(bert_path, texts, bert_model): if bert_path is not None: if os.path.exists(bert_path): bert_ouput = pkl.load(open(bert_path, 'rb')) else: bert_ouput = bert_embeddings_from_list(texts, bert_model) pkl.dump(bert_ouput, open(bert_path, 'wb')) else: bert_ouput = bert_embeddings_from_list(texts, bert_model) return bert_ouput