活动介绍
file-type

Java实现TF-IDF排序算法详解

版权申诉

RAR文件

1.82MB | 更新于2024-10-25 | 156 浏览量 | 0 下载量 举报 收藏
download 限时特惠:#14.90
该技术的主要目的是评估一字词对于一个文件集或一个语料库中的其中一份文件的重要程度。在TF-IDF算法中,一个词的重要性与它在文档中出现的频率(TF)成正比,与它在语料库中出现的频率(IDF)成反比。这个算法在搜索引擎的关键词加权、文档分类和信息检索等领域有着广泛的应用。在本资源中,我们将重点讨论如何在Java语言中利用Lucene库来实现TF-IDF排序算法。" 知识点一:TF-IDF算法原理 TF-IDF算法由两部分组成:词频(Term Frequency, TF)和逆文档频率(Inverse Document Frequency, IDF)。词频(TF)指的是词在特定文档中出现的次数,逆文档频率(IDF)是衡量词重要性的指标,通过将词在整个语料库中出现的文档数量取对数来计算。IDF的值会随着包含该词的文档数量的增加而降低。两者结合后,可以在一定程度上反映关键词对于文档集合和文档的重要性。具体计算公式为:TF-IDF(t, d) = TF(t, d) * log(IDF(t))。 知识点二:Lucene搜索引擎框架 Lucene是一个高效的、可扩展的全文搜索引擎库,由Java编写。它提供了一套完整的API,使得开发者能够在自己的应用中实现搜索功能。Lucene能够处理大量的文本数据,并且能够高效地完成索引和搜索。在本资源中,我们利用Lucene库来处理文档数据,实现TF-IDF算法的计算和文档排序。 知识点三:在Java中实现TF-IDF排序 利用Java实现TF-IDF排序首先需要对文档集合进行预处理,包括分词、去除停用词等。然后,根据TF-IDF算法计算每个词的权重,索引创建阶段需要将文档中的词及其TF-IDF值存储起来。在搜索阶段,针对用户的查询词,同样计算其TF-IDF值,并在索引中搜索匹配的文档,根据文档中查询词的TF-IDF值进行排序。 知识点四:代码实现与优化 实现TF-IDF排序的具体代码实现涉及多个方面,包括但不限于: 1. 文档集合的读取和预处理; 2. 文档分词和词频统计; 3. 停用词过滤; 4. 逆文档频率的计算; 5. TF-IDF值的计算和存储; 6. 查询处理和结果排序; 7. 搜索效率优化等。 在实际开发中,可能还会涉及其他一些技术细节,例如使用特定的数据结构来存储TF-IDF权重以提高查询效率,以及通过算法优化来减少计算量,从而提升整体的搜索性能。 知识点五:应用场景 TF-IDF排序技术在很多领域都有应用,如搜索引擎中关键词排名,自动文摘生成,信息过滤和推荐系统等。它能帮助区分哪些词是重要词汇,哪些词是普通词汇,从而为用户提供更加准确、高效的信息检索服务。通过使用TF-IDF排序技术,可以提高用户信息检索的质量和满意度。 知识点六:挑战与发展方向 虽然TF-IDF算法在许多场景中都非常有效,但它也有局限性。例如,该算法不会考虑词序和上下文语境,可能会忽视词与词之间的关联性。因此,在处理复杂的数据时,可能需要结合其他技术,如词向量(word embeddings)等进行综合分析。另外,随着深度学习技术的发展,神经网络模型在文本信息处理方面取得了不错的进展,如BERT、GPT等预训练语言模型,在理解文本语义和生成文本方面都有很好的表现。未来,TF-IDF技术与其他算法结合的混合模型可能会成为研究与应用的新趋势。

相关推荐

filetype

报错ValueError: np.nan is an invalid document, expected byte or unicode string. 怎么修改import pandas as pd from sklearn.feature_extraction.text import CountVectorizer, TfidfVectorizer from sklearn.model_selection import train_test_split from sklearn.linear_model import LogisticRegression from sklearn.metrics import accuracy_score # 读取电影评论数据集 data = pd.read_csv(r'D:\shujukexue\review_data.csv', encoding='gbk') x = v.fit_transform(df['eview'].apply(lambda x: np.str_(x))) # 分割数据集为训练集和测试集 X_train, X_test, y_train, y_test = train_test_split(data['review'], data['sentiment'], test_size=0.2, random_state=42) # 创建CountVectorizer对象进行词频统计和向量化 count_vectorizer = CountVectorizer() X_train_count = count_vectorizer.fit_transform(X_train) X_test_count = count_vectorizer.transform(X_test) # 创建TfidfVectorizer对象进行TF-IDF计算和向量化 tfidf_vectorizer = TfidfVectorizer() X_train_tfidf = tfidf_vectorizer.fit_transform(X_train) X_test_tfidf = tfidf_vectorizer.transform(X_test) # 创建逻辑回归分类器并在CountVectorizer上进行训练和预测 classifier_count = LogisticRegression() classifier_count.fit(X_train_count, y_train) y_pred_count = classifier_count.predict(X_test_count) accuracy_count = accuracy_score(y_test, y_pred_count) print("Accuracy using CountVectorizer:", accuracy_count) # 创建逻辑回归分类器并在TfidfVectorizer上进行训练和预测 classifier_tfidf = LogisticRegression() classifier_tfidf.fit(X_train_tfidf, y_train) y_pred_tfidf = classifier_tfidf.predict(X_test_tfidf) accuracy_tfidf = accuracy_score(y_test, y_pred_tfidf) print("Accuracy using TfidfVectorizer:", accuracy_tfidf)

filetype
filetype

import os import re import time import torch import torch.nn as nn import pandas as pd import numpy as np from datetime import datetime from torch.utils.data import Dataset, DataLoader, random_split from tqdm import tqdm import pickle import mysql.connector from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity from gensim.models import Word2Vec # SpaCy 用来取代 nltk 的低效文本处理 import spacy nlp = spacy.load("en_core_web_sm", disable=["parser", "ner"]) # 使用 gensim 的 Word2Vec 和 KeyedVectors from gensim.models import Word2Vec, TfidfModel from gensim.corpora import Dictionary # 自定义 Preprocess(快速版本) STOPWORDS = spacy.lang.en.stop_words.STOP_WORDS def clean_text(text): return re.sub(r'[^a-zA-Z0-9\s]', '', str(text)).strip().lower() def tokenize(text): doc = nlp(clean_text(text)) return [token.text for token in doc if token.text not in STOPWORDS and token.text.isalnum()] def preprocess(text): tokens = tokenize(text) return " ".join(tokens) class SemanticMatchModel(nn.Module): def __init__(self, input_dim): super().__init__() self.fc1 = nn.Linear(input_dim, 256) self.bn1 = nn.BatchNorm1d(256) self.fc2 = nn.Linear(256, 128) self.bn2 = nn.BatchNorm1d(128) self.fc3 = nn.Linear(128, 64) self.bn3 = nn.BatchNorm1d(64) self.fc4 = nn.Linear(64, 1) self.dropout = nn.Dropout(0.3) self.relu = nn.ReLU() self.sigmoid = nn.Sigmoid() def forward(self, x): x = self.relu(self.bn1(self.fc1(x))) x = self.dropout(x) x = self.relu(self.bn2(self.fc2(x))) x = self.dropout(x) x = self.relu(self.bn3(self.fc3(x))) x = self.dropout(x) x = self.sigmoid(self.fc4(x)) return x class QADataset(Dataset): """ 数据集:将正样本 (question, answer) 与随机负样本 (question, random_answer) 拼接在一起, 其中正样本 label=1,负样本 label=0。 """ def __init__(self, qa_pairs, tfidf_vectorizer, negative_ratio=1.0): """ :param qa_pairs: [(question_text, answer_text), ...] :param tfidf_vectorizer: 已经fit好的 TfidfVectorizer :param negative_ratio: 每个正样本对应的负样本倍数 """ self.qa_pairs = qa_pairs self.vectorizer = tfidf_vectorizer self.samples = [] # 构造正样本 for i, (q, a) in enumerate(self.qa_pairs): self.samples.append((q, a, 1)) # label=1 # 构建负样本:random替换answer if negative_ratio > 0: negative_samples = [] total_pairs = len(self.qa_pairs) for i, (q, a) in enumerate(self.qa_pairs): for _ in range(int(negative_ratio)): rand_idx = np.random.randint(total_pairs) # 若随机到同一个qa对,就重新随机 while rand_idx == i: rand_idx = np.random.randint(total_pairs) neg_q, neg_a = self.qa_pairs[rand_idx] # 保持question不变,随机替换答案 negative_samples.append((q, neg_a, 0)) self.samples.extend(negative_samples) def __len__(self): return len(self.samples) def __getitem__(self, idx): q, a, label = self.samples[idx] q_vec = self.vectorizer.transform([preprocess(q)]).toarray()[0] a_vec = self.vectorizer.transform([preprocess(a)]).toarray()[0] pair_vec = np.concatenate((q_vec, a_vec)) return torch.tensor(pair_vec, dtype=torch.float32), torch.tensor(label, dtype=torch.float32) class KnowledgeBase: def __init__(self, host='localhost', user='root', password='hy188747', database='ubuntu_qa', table='qa_pair', model_dir=r"D:\NLP-PT\PT4\model", negative_ratio=1.0): print("🔄 初始化知识库...") self.host = host self.user = user self.password = password self.database = database self.table = table self.model_dir = model_dir self.negative_ratio = negative_ratio # 确保模型目录存在 os.makedirs(self.model_dir, exist_ok=True) self.qa_pairs = [] self.q_texts = [] self.a_texts = [] self.semantic_model = None self.word2vec_model = None self.tfidf_vectorizer = None self.tfidf_matrix = None # 第一步:从数据库载入数据 self.load_data_from_mysql() # 第二步:加载或缓存预处理后的文本 self.load_or_cache_processed_questions() # 第三步:加载 TF-IDF + 向量化 self.load_cached_tfidf() # 第四步:加载 Word2Vec 或使用缓存 self.load_cached_word2vec_model() # 第五步:加载 PyTorch 模型 model_path = os.path.join(self.model_dir, 'semantic_match_model.pth') if os.path.exists(model_path): self.load_model() def load_data_from_mysql(self): print("🔄 正在连接 MySQL,加载问答数据...") conn = mysql.connector.connect( host=self.host, user=self.user, password=self.password, database=self.database ) cursor = conn.cursor() query = f"SELECT question_text, answer_text FROM {self.table}" cursor.execute(query) rows = cursor.fetchall() conn.close() self.qa_pairs = [(row[0], row[1]) for row in rows] self.q_texts = [pair[0] for pair in self.qa_pairs] self.a_texts = [pair[1] for pair in self.qa_pairs] print(f"✅ 成功从 MySQL 加载 {len(self.qa_pairs)} 条问答数据。") def load_or_cache_processed_questions(self): """使用本地缓存避免每次都预处理大量数据""" cache_path = os.path.join(self.model_dir, 'processed_questions.pkl') if os.path.exists(cache_path): print("🔄 使用缓存预处理后的分词文本。") with open(cache_path, 'rb') as f: self.processed_q_list = pickle.load(f) else: print("🔄 正在预处理问题文本(首次较慢)...") self.processed_q_list = [preprocess(q) for q in self.q_texts] with open(cache_path, 'wb') as f: pickle.dump(self.processed_q_list, f) print("✅ 预处理缓存已保存。") def load_cached_tfidf(self): """加载已存在的 TfidfVectorizer 或构建""" cache_tfidf_matrix = os.path.join(self.model_dir, 'tfidf_matrix.npz') cache_qa_list = os.path.join(self.model_dir, 'tfidf_qa.pkl') tfidf_path = os.path.join(self.model_dir, 'tfidf_vectorizer.pkl') if os.path.exists(tfidf_path) and os.path.exists(cache_tfidf_matrix) and os.path.exists(cache_qa_list): print("🔄 加载 TF-IDF 缓存版本。") import joblib self.tfidf_vectorizer = joblib.load(tfidf_path) self.tfidf_matrix = np.load(cache_tfidf_matrix)['tfidf'] with open(cache_qa_list, 'rb') as f: self.tfidf_qa = pickle.load(f) else: print("🔄 创建并构建 TF-IDF(首次较慢)...") self.tfidf_vectorizer = TfidfVectorizer( tokenizer=lambda x: x.split(), lowercase=False, max_features=10000 ) self.tfidf_qa = self.processed_q_list self.tfidf_matrix = self.tfidf_vectorizer.fit_transform(self.tfidf_qa).toarray() print("✅ TF-IDF 构建完成。") import joblib joblib.dump(self.tfidf_vectorizer, tfidf_path) np.savez_compressed(cache_tfidf_matrix, tfidf=self.tfidf_matrix) with open(cache_qa_list, 'wb') as f: pickle.dump(self.tfidf_qa, f) def load_cached_word2vec_model(self): """加载已训练好的 Word2Vec 模型,没有就训练""" word2vec_path = os.path.join(self.model_dir, 'word2vec.model') if os.path.exists(word2vec_path): print("🔄 加载缓存中的 Word2Vec 模型...") self.word2vec_model = Word2Vec.load(word2vec_path) else: print("🔄 训练 Word2Vec 模型(首次较慢)...") tokenized_questions = [preprocess(q).split() for q in self.q_texts] self.word2vec_model = Word2Vec( sentences=tokenized_questions, vector_size=100, window=5, min_count=1, workers=4 ) self.word2vec_model.save(word2vec_path) print("✅ Word2Vec 模型训练完成并保存。") def sentence_to_vec(self, sentence): """将句子转换为向量表示""" tokens = preprocess(sentence).split() if self.word2vec_model: vecs = [self.word2vec_model.wv[w] for w in tokens if w in self.word2vec_model.wv] return np.mean(vecs, axis=0) if vecs else np.zeros(self.word2vec_model.vector_size) else: # 没有 Word2Vec 模型时,使用 TF-IDF 向量 return self.tfidf_vectorizer.transform([preprocess(sentence)]).toarray()[0] def build_model(self, epochs=10, batch_size=128, lr=1e-3): """ 构建并训练语义匹配模型,包含训练集/验证集拆分与性能监控。 """ # 创建数据集 full_dataset = QADataset(self.qa_pairs, self.tfidf_vectorizer, negative_ratio=self.negative_ratio) # 划分训练集/验证集 train_size = int(len(full_dataset) * 0.8) val_size = len(full_dataset) - train_size train_dataset, val_dataset = random_split(full_dataset, [train_size, val_size]) # 创建数据加载器 train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2) val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=2) # 初始化模型 sample_input, _ = full_dataset[0] input_dim = sample_input.shape[0] self.semantic_model = SemanticMatchModel(input_dim) criterion = nn.BCELoss() optimizer = optim.Adam(self.semantic_model.parameters(), lr=lr) scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=1, gamma=0.9) # 训练模型 best_val_acc = 0.0 print("\n开始模型训练...") start_time = time.time() for epoch in range(epochs): self.semantic_model.train() total_loss, total_correct, total_samples = 0.0, 0, 0 for X_batch, y_batch in tqdm(train_loader, desc=f"Epoch {epoch + 1}/{epochs} - 训练中"): optimizer.zero_grad() outputs = self.semantic_model(X_batch).squeeze() loss = criterion(outputs, y_batch) loss.backward() optimizer.step() total_loss += loss.item() * len(y_batch) preds = (outputs >= 0.5).float() total_correct += (preds == y_batch).sum().item() total_samples += len(y_batch) train_loss = total_loss / total_samples train_acc = total_correct / total_samples # 验证阶段 self.semantic_model.eval() val_loss, val_correct, val_samples = 0.0, 0, 0 with torch.no_grad(): for X_val, y_val in val_loader: outputs_val = self.semantic_model(X_val).squeeze() loss_val = criterion(outputs_val, y_val) val_loss += loss_val.item() * len(y_val) preds_val = (outputs_val >= 0.5).float() val_correct += (preds_val == y_val).sum().item() val_samples += len(y_val) val_loss /= val_samples val_acc = val_correct / val_samples # 更新学习率 scheduler.step() print(f"Epoch [{epoch + 1}/{epochs}] | " f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} | " f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}") # 保存最优模型 if val_acc > best_val_acc: best_val_acc = val_acc model_path = os.path.join(self.model_dir, 'semantic_match_model.pth') torch.save(self.semantic_model.state_dict(), model_path) print(f"✅ 新的最优模型已保存 (Val Acc: {best_val_acc:.4f})") end_time = time.time() print(f"\n训练完成,共耗时 {end_time - start_time:.2f} 秒。") # 加载最优模型权重 model_path = os.path.join(self.model_dir, 'semantic_match_model.pth') self.semantic_model.load_state_dict(torch.load(model_path)) self.semantic_model.eval() def load_model(self): """加载训练好的语义匹配 PyTorch 模型""" input_dim = self.tfidf_matrix.shape[1] * 2 model_path = os.path.join(self.model_dir, 'semantic_match_model.pth') self.semantic_model = SemanticMatchModel(input_dim) self.semantic_model.load_state_dict(torch.load(model_path, map_location='cpu')) self.semantic_model.eval() print("✅ 语义匹配模型加载完成。") def retrieve(self, query, semantic_topk=100): """ 检索接口:先通过 TF-IDF + 句向量评分做粗检,再对Top-K结果用语义模型做精检,返回最匹配的 QA。 """ # 粗检 query_tfidf = self.tfidf_vectorizer.transform([preprocess(query)]).toarray()[0] tfidf_scores = cosine_similarity([query_tfidf], self.tfidf_matrix).flatten() query_sent_vec = self.sentence_to_vec(query) sent_vecs = np.array([self.sentence_to_vec(q) for q in self.q_texts]) sent_scores = cosine_similarity([query_sent_vec], sent_vecs).flatten() sim_scores = tfidf_scores + sent_scores topk_indices = np.argpartition(sim_scores, -semantic_topk)[-semantic_topk:] topk_indices = topk_indices[np.argsort(sim_scores[topk_indices])[::-1]] # 精检 if self.semantic_model: with torch.no_grad(): batch_inputs = [] for i in topk_indices: q = preprocess(self.q_texts[i]) a = preprocess(self.a_texts[i]) q_vec = self.tfidf_vectorizer.transform([q]).toarray()[0] a_vec = self.tfidf_vectorizer.transform([a]).toarray()[0] pair_input = np.concatenate((q_vec, a_vec)) batch_inputs.append(pair_input) batch_inputs = torch.tensor(np.stack(batch_inputs), dtype=torch.float32) batch_scores = self.semantic_model(batch_inputs).squeeze().cpu().numpy() semantic_scores = batch_scores # 综合得分 final_scores = sim_scores[topk_indices] + semantic_scores best_idx = topk_indices[np.argmax(final_scores)] return self.qa_pairs[best_idx], final_scores.max() else: # 没有语义模型时,只使用粗检结果 best_idx = topk_indices[0] return self.qa_pairs[best_idx], sim_scores[best_idx] def recommend_similar(self, query, topk=3): """针对未命中答案的情况,推荐相似问题""" query_tfidf = self.tfidf_vectorizer.transform([preprocess(query)]).toarray()[0] scores = cosine_similarity([query_tfidf], self.tfidf_matrix).flatten() topk_idx = scores.argsort()[0][-topk:][::-1] return [(self.qa_pairs[i][0], self.qa_pairs[i][1]) for i in topk_idx] class FeedbackRecorder: """记录未回答问题""" def __init__(self, file_path='unanswered_questions.csv'): self.file_path = file_path if not os.path.exists(self.file_path): with open(self.file_path, 'w', newline='', encoding='utf-8') as f: import csv csv.writer(f).writerow(['time', 'question']) def record_question(self, question): with open(self.file_path, 'a', newline='', encoding='utf-8') as f: import csv writer = csv.writer(f) writer.writerow([datetime.now().isoformat(), question]) def main(): kb = KnowledgeBase( host='localhost', user='root', password='hy188747', database='ubuntu_qa', table='qa_pair', model_dir=r"D:\NLP-PT\PT4\model", negative_ratio=1.0 ) # 是否重新训练语义匹配模型 if input("是否重新训练语义匹配模型?(y/n): ").strip().lower() == 'y': kb.build_model( epochs=5, # 训练轮数 batch_size=128, # 批大小 lr=1e-3 # 学习率 ) recorder = FeedbackRecorder() print("\n🎯 智能知识问答系统已启动(输入'q'退出聊天)\n") while True: query = input("🧐 问题:") if query.strip().lower() == 'q': break result, score = kb.retrieve(query) if result: print("💡 回答:", result[1]) print(f"📊 匹配信心分数: {score:.4f}\n") else: print("⚠ 没有找到合适的答案,已将你的问题记录下来。") recorder.record_question(query) print("🔥 相似问题推荐:") for q, a in kb.recommend_similar(query): print(f"Q: {q}\nA: {a}\n") if __name__ == "__main__": main()