kimi_k25_processor.py
6.7 KB · 166 lines · python Raw
1 from transformers.feature_extraction_utils import BatchFeature
2 from transformers.processing_utils import ProcessorMixin
3 from transformers.utils import logging
4
5 logger = logging.get_logger(__name__)
6
7
8 class KimiK25Processor(ProcessorMixin):
9 r"""
10 Constructs a KimiK25 processor which wraps a KimiK25 image processor and a tokenizer into a single processor.
11
12 [`KimiK25Processor`] offers all the functionalities of [`KimiK25ImageProcessor`] and [`TikTokenTokenizer`]. See the
13 [`~KimiK25Processor.__call__`] and [`~KimiK25Processor.decode`] for more information.
14
15 Args:
16 image_processor ([`KimiK25ImageProcessor`], *optional*):
17 The image processor is a required input.
18 tokenizer ([`TikTokenTokenizer`], *optional*):
19 The tokenizer is a required input.
20 chat_template (`str`, *optional*): A Jinja template which will be used to convert lists of messages
21 in a chat into a tokenizable string.
22 """
23
24 attributes = ["image_processor", "tokenizer"]
25 valid_kwargs = ["chat_template"]
26 image_processor_class = "AutoImageProcessor"
27 tokenizer_class = "AutoTokenizer"
28
29 def __init__(
30 self,
31 image_processor=None,
32 tokenizer=None,
33 chat_template=None,
34 **kwargs,
35 ):
36 super().__init__(image_processor,
37 tokenizer,
38 chat_template=chat_template)
39 self.media_processor = image_processor
40 # A special temporal placeholder to be replaced by actual video placeholders
41 self.video_placeholder = "<|kimi_k25_video_placeholder|>"
42
43 def update_raw_text(self, text: str, video_prompts: list[str]) -> str:
44 # replace video prompt in text with video chunk prompts
45 video_count = text.count(self.video_placeholder)
46 if video_count == 0:
47 return text
48 assert video_count == len(video_prompts)
49 text_parts = text.split(self.video_placeholder)
50 assert len(text_parts) == len(video_prompts) + 1
51 text = "".join([
52 text_parts[i] + video_prompts[i] for i in range(len(video_prompts))
53 ])
54 text += text_parts[-1]
55 return text
56
57 def preprocess_medias(self, medias: list[dict]) -> list[dict]:
58 updated_medias = []
59 video_prompts = []
60 for media in medias:
61 if media['type'] == 'image':
62 updated_medias.append(media)
63 elif media['type'] == 'video':
64 video_chunks = self.media_processor.split_video_chunks(
65 media['video'])
66 updated_medias.extend(video_chunks)
67 video_prompts.append("".join(
68 [vc['prompt'] for vc in video_chunks]))
69 else:
70 raise ValueError(f"unsupported media type: {media['type']}")
71 return updated_medias, video_prompts
72
73 def __call__(self,
74 messages: list[dict] = None,
75 medias: list[dict] = None,
76 text: str = None,
77 return_tensors: str = "pt",
78 **kwargs) -> BatchFeature:
79 """
80 Process multimodal inputs for Kimi-K2.5 model.
81
82 This processor accepts ordered messages and extracts both media and text in a single pass.
83 text will be automatically updated if video input detected in messages
84
85 Args:
86 messages: List of message dicts with 'role' and 'content' fields.
87 If provided, medias and text will be extracted automatically.
88 medias: Pre-extracted list of media dicts. If None, extracted from messages.
89 text: Pre-formatted text string. If None, generated via apply_chat_template.
90 return_tensors: Format of returned tensors ('pt', 'np', 'tf'). Default: 'pt'.
91 **kwargs: Additional arguments passed to tokenizer.apply_chat_template.
92
93 Returns:
94 BatchFeature with fields: input_ids, attention_mask, pixel_values, grid_thws.
95 """
96 if messages is None and (medias is None or text is None):
97 raise ValueError(
98 "Provide either 'messages' or both 'medias' and 'text'")
99
100 if medias is not None and text is not None:
101 updated_medias, video_prompts = self.preprocess_medias(medias)
102 preprocessed = self.media_processor.preprocess(
103 updated_medias, return_tensors=return_tensors)
104 text = self.update_raw_text(text, video_prompts)
105 text_inputs = self.tokenizer(text, return_tensors=return_tensors)
106 return BatchFeature(data={**text_inputs, **preprocessed.data})
107
108 if medias is None:
109 medias = self._extract_medias_from_messages(messages)
110 updated_medias, video_prompts = self.preprocess_medias(medias)
111 preprocessed = self.media_processor.preprocess(
112 updated_medias, return_tensors=return_tensors)
113
114 # Generate text if not provided
115 if text is None:
116 text = self.tokenizer.apply_chat_template(messages, **kwargs)
117
118 text = self.update_raw_text(text, video_prompts)
119
120 text_inputs = self.tokenizer(text, return_tensors=return_tensors)
121 return BatchFeature(data={**text_inputs, **preprocessed.data})
122
123 @staticmethod
124 def _extract_medias_from_messages(messages: list[dict]) -> list[dict]:
125 """
126 Extract media items from messages in a single pass.
127
128 This is an optimized version that processes messages only once.
129 Kept as internal method since external callers should use __call__.
130 """
131 medias = []
132 for msg in messages:
133 if msg['role'] != 'user' or not msg.get('content'):
134 continue
135
136 for content_part in msg['content']:
137 if not isinstance(content_part, dict):
138 continue
139
140 content_type = content_part.get('type')
141 if content_type in ['video_url', 'video']:
142 medias.append({
143 'type': 'video',
144 'video': content_part['video_url']['url'],
145 'first_frame_timestamp': 0.0
146 })
147 elif content_type in ['image_url', 'image']:
148 medias.append({
149 'type': 'image',
150 'image': content_part['image_url'],
151 })
152 return medias
153
154 def apply_chat_template(self, messages, **kwargs):
155 return self.tokenizer.apply_chat_template(messages, **kwargs)
156
157 def batch_decode(self, *args, **kwargs):
158 return self.tokenizer.batch_decode(*args, **kwargs)
159
160 def decode(self, *args, **kwargs):
161 return self.tokenizer.decode(*args, **kwargs)
162
163 @property
164 def model_input_names(self):
165 return ['input_ids', 'attention_mask', 'pixel_values', 'grid_thws']
166