scripts/gte_embedding.py
| 1 | # coding=utf-8 |
| 2 | # Copyright 2024 The GTE Team Authors and Alibaba Group. |
| 3 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | |
| 5 | from collections import defaultdict |
| 6 | from typing import Dict, List, Tuple |
| 7 | |
| 8 | import numpy as np |
| 9 | import torch |
| 10 | from transformers import AutoModelForTokenClassification, AutoTokenizer |
| 11 | from transformers.utils import is_torch_npu_available |
| 12 | |
| 13 | |
| 14 | class GTEEmbeddidng(torch.nn.Module): |
| 15 | def __init__(self, |
| 16 | model_name: str = None, |
| 17 | normalized: bool = True, |
| 18 | use_fp16: bool = True, |
| 19 | device: str = None |
| 20 | ): |
| 21 | super().__init__() |
| 22 | self.normalized = normalized |
| 23 | if device: |
| 24 | self.device = torch.device(device) |
| 25 | else: |
| 26 | if torch.cuda.is_available(): |
| 27 | self.device = torch.device("cuda") |
| 28 | elif torch.backends.mps.is_available(): |
| 29 | self.device = torch.device("mps") |
| 30 | elif is_torch_npu_available(): |
| 31 | self.device = torch.device("npu") |
| 32 | else: |
| 33 | self.device = torch.device("cpu") |
| 34 | use_fp16 = False |
| 35 | self.use_fp16 = use_fp16 |
| 36 | self.tokenizer = AutoTokenizer.from_pretrained(model_name) |
| 37 | self.model = AutoModelForTokenClassification.from_pretrained( |
| 38 | model_name, trust_remote_code=True, torch_dtype=torch.float16 if self.use_fp16 else None |
| 39 | ) |
| 40 | self.vocab_size = self.model.config.vocab_size |
| 41 | self.model.to(self.device) |
| 42 | |
| 43 | def _process_token_weights(self, token_weights: np.ndarray, input_ids: list): |
| 44 | # conver to dict |
| 45 | result = defaultdict(int) |
| 46 | unused_tokens = set([self.tokenizer.cls_token_id, self.tokenizer.eos_token_id, self.tokenizer.pad_token_id, |
| 47 | self.tokenizer.unk_token_id]) |
| 48 | # token_weights = np.ceil(token_weights * 100) |
| 49 | for w, idx in zip(token_weights, input_ids): |
| 50 | if idx not in unused_tokens and w > 0: |
| 51 | token = self.tokenizer.decode([int(idx)]) |
| 52 | if w > result[token]: |
| 53 | result[token] = w |
| 54 | return result |
| 55 | |
| 56 | @torch.no_grad() |
| 57 | def encode(self, |
| 58 | texts: None, |
| 59 | dimension: int = None, |
| 60 | max_length: int = 8192, |
| 61 | batch_size: int = 16, |
| 62 | return_dense: bool = True, |
| 63 | return_sparse: bool = False): |
| 64 | if dimension is None: |
| 65 | dimension = self.model.config.hidden_size |
| 66 | if isinstance(texts, str): |
| 67 | texts = [texts] |
| 68 | num_texts = len(texts) |
| 69 | all_dense_vecs = [] |
| 70 | all_token_weights = [] |
| 71 | for n, i in enumerate(range(0, num_texts, batch_size)): |
| 72 | batch = texts[i: i + batch_size] |
| 73 | resulst = self._encode(batch, dimension, max_length, batch_size, return_dense, return_sparse) |
| 74 | if return_dense: |
| 75 | all_dense_vecs.append(resulst['dense_embeddings']) |
| 76 | if return_sparse: |
| 77 | all_token_weights.extend(resulst['token_weights']) |
| 78 | all_dense_vecs = torch.cat(all_dense_vecs, dim=0) |
| 79 | return { |
| 80 | "dense_embeddings": all_dense_vecs, |
| 81 | "token_weights": all_token_weights |
| 82 | } |
| 83 | |
| 84 | @torch.no_grad() |
| 85 | def _encode(self, |
| 86 | texts: Dict[str, torch.Tensor] = None, |
| 87 | dimension: int = None, |
| 88 | max_length: int = 1024, |
| 89 | batch_size: int = 16, |
| 90 | return_dense: bool = True, |
| 91 | return_sparse: bool = False): |
| 92 | |
| 93 | text_input = self.tokenizer(texts, padding=True, truncation=True, return_tensors='pt', max_length=max_length) |
| 94 | text_input = {k: v.to(self.model.device) for k,v in text_input.items()} |
| 95 | model_out = self.model(**text_input, return_dict=True) |
| 96 | |
| 97 | output = {} |
| 98 | if return_dense: |
| 99 | dense_vecs = model_out.last_hidden_state[:, 0, :dimension] |
| 100 | if self.normalized: |
| 101 | dense_vecs = torch.nn.functional.normalize(dense_vecs, dim=-1) |
| 102 | output['dense_embeddings'] = dense_vecs |
| 103 | if return_sparse: |
| 104 | token_weights = torch.relu(model_out.logits).squeeze(-1) |
| 105 | token_weights = list(map(self._process_token_weights, token_weights.detach().cpu().numpy().tolist(), |
| 106 | text_input['input_ids'].cpu().numpy().tolist())) |
| 107 | output['token_weights'] = token_weights |
| 108 | |
| 109 | return output |
| 110 | |
| 111 | def _compute_sparse_scores(self, embs1, embs2): |
| 112 | scores = 0 |
| 113 | for token, weight in embs1.items(): |
| 114 | if token in embs2: |
| 115 | scores += weight * embs2[token] |
| 116 | return scores |
| 117 | |
| 118 | def compute_sparse_scores(self, embs1, embs2): |
| 119 | scores = [self._compute_sparse_scores(emb1, emb2) for emb1, emb2 in zip(embs1, embs2)] |
| 120 | return np.array(scores) |
| 121 | |
| 122 | def compute_dense_scores(self, embs1, embs2): |
| 123 | scores = torch.sum(embs1*embs2, dim=-1).cpu().detach().numpy() |
| 124 | return scores |
| 125 | |
| 126 | @torch.no_grad() |
| 127 | def compute_scores(self, |
| 128 | text_pairs: List[Tuple[str, str]], |
| 129 | dimension: int = None, |
| 130 | max_length: int = 1024, |
| 131 | batch_size: int = 16, |
| 132 | dense_weight=1.0, |
| 133 | sparse_weight=0.1): |
| 134 | text1_list = [text_pair[0] for text_pair in text_pairs] |
| 135 | text2_list = [text_pair[1] for text_pair in text_pairs] |
| 136 | embs1 = self.encode(text1_list, dimension, max_length, batch_size, return_dense=True, return_sparse=True) |
| 137 | embs2 = self.encode(text2_list, dimension, max_length, batch_size, return_dense=True, return_sparse=True) |
| 138 | scores = self.compute_dense_scores(embs1['dense_embeddings'], embs2['dense_embeddings']) * dense_weight + \ |
| 139 | self.compute_sparse_scores(embs1['token_weights'], embs2['token_weights']) * sparse_weight |
| 140 | scores = scores.tolist() |
| 141 | return scores |
| 142 | |
| 143 | |
| 144 | if __name__ == '__main__': |
| 145 | gte = GTEEmbeddidng('Alibaba-NLP/gte-multilingual-base') |
| 146 | docs = [ |
| 147 | "黑龙江离俄罗斯很近", |
| 148 | "哈尔滨是中国黑龙江省的省会,位于中国东北", |
| 149 | "you are the hero" |
| 150 | ] |
| 151 | print('docs', docs) |
| 152 | embs = gte.encode(docs, return_dense=True,return_sparse=True) |
| 153 | print('dense vecs', embs['dense_embeddings']) |
| 154 | print('sparse vecs', embs['token_weights']) |
| 155 | |