Skip to content

Commit e5a8bb6

Browse files
mzegladtrawins
andauthored
Synchronize entire embeddings calculation phase (#1967)
Due to extensive usage of class member fields effectively holding the state between calls or even phases of processing in the single call current synchronization in embeddings calculation logic is insufficient leading to inaccurate results when `add_request` is called in parallel from multiple threads. --------- Co-authored-by: Dariusz Trawinski <[email protected]>
1 parent 604b4c9 commit e5a8bb6

File tree

9 files changed

+40
-9
lines changed

9 files changed

+40
-9
lines changed

src/cpp/src/block_manager.hpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -550,6 +550,7 @@ class BlockManager {
550550
* @return A vector of blocks (one for each layer) occupied by this sequence for this layer.
551551
*/
552552
const std::vector<KVCacheBlock::Ptr>& get_block_table(uint64_t seq_id, size_t layer_idx) {
553+
std::lock_guard<std::mutex> lock(m_cached_blocks_map_mutex);
553554
OPENVINO_ASSERT(m_block_table.count(seq_id) == 1);
554555
return m_block_table[seq_id][layer_idx];
555556
}
@@ -570,6 +571,7 @@ class BlockManager {
570571
* @return Number of blocks freed in each sequence in the group.
571572
*/
572573
const size_t free_group_partially(SequenceGroup::Ptr sequence_group, size_t num_required_blocks) {
574+
std::lock_guard<std::mutex> lock(m_cached_blocks_map_mutex);
573575
size_t blocks_num = std::ceil(num_required_blocks / sequence_group->get_not_finished_sequences().size());
574576
auto not_finished_sequences = sequence_group->get_not_finished_sequences();
575577
for (size_t idx = 0; idx < not_finished_sequences.size(); ++idx) {
@@ -613,6 +615,7 @@ class BlockManager {
613615
}
614616

615617
const size_t free_partially_beam_search_group(SequenceGroup::Ptr sequence_group, size_t num_required_blocks) {
618+
std::lock_guard<std::mutex> lock(m_cached_blocks_map_mutex);
616619
size_t physical_blocks_released = 0;
617620
size_t logical_blocks_released = 0;
618621
while (num_required_blocks > physical_blocks_released) {
@@ -632,6 +635,7 @@ class BlockManager {
632635
* @return The number of distinct physical blocks occupied by this sequence group.
633636
*/
634637
const size_t get_number_of_blocks_occupied_by_sequence(SequenceGroup::Ptr sequence_group) {
638+
std::lock_guard<std::mutex> lock(m_cached_blocks_map_mutex);
635639
auto running_sequences = sequence_group->get_not_finished_sequences();
636640
std::set<size_t> indices;
637641
for (size_t idx = 0; idx < running_sequences.size(); ++idx) {
@@ -652,6 +656,7 @@ class BlockManager {
652656
* @return Whether or not this BlockManager is managing this sequence group.
653657
*/
654658
const bool has_block_table(uint64_t seq_id) {
659+
std::lock_guard<std::mutex> lock(m_cached_blocks_map_mutex);
655660
return m_block_table.count(seq_id) > 0;
656661
}
657662

@@ -766,6 +771,7 @@ class BlockManager {
766771
* other sequences tracked by this BlockManager.
767772
*/
768773
void fork_sequence(uint64_t parent_id, uint64_t child_id) {
774+
std::lock_guard<std::mutex> lock(m_cached_blocks_map_mutex);
769775
OPENVINO_ASSERT(m_block_table.count(child_id) == 0);
770776
m_block_table[child_id].resize(m_num_layers);
771777
for (size_t layer_idx = 0; layer_idx < m_num_layers; layer_idx++) {
@@ -782,6 +788,7 @@ class BlockManager {
782788
* @param seq_id Identifier of the sequence to free.
783789
*/
784790
void free_sequence(size_t seq_id) {
791+
std::lock_guard<std::mutex> lock(m_cached_blocks_map_mutex);
785792
OPENVINO_ASSERT(m_block_table.find(seq_id) != m_block_table.end(), "sequence with id ", seq_id,
786793
" not found in BlockManager, but requested to free");
787794
auto& block_table = m_block_table[seq_id];
@@ -846,6 +853,7 @@ class BlockManager {
846853
* @param logical_block_index_sets_to_free Sets (one for each layer) of logical block indices to be freed from this sequence.
847854
*/
848855
void free_blocks_from_sequence(size_t seq_id, const std::vector<std::set<size_t>>& logical_block_index_sets_to_free) {
856+
std::lock_guard<std::mutex> lock(m_cached_blocks_map_mutex);
849857
std::vector<std::vector<size_t>> logical_block_indices_to_free(logical_block_index_sets_to_free.size());
850858
for (size_t i = 0; i < logical_block_index_sets_to_free.size(); i++) {
851859
const auto& index_set = logical_block_index_sets_to_free[i];
@@ -916,6 +924,7 @@ class BlockManager {
916924
* allocated ones.
917925
*/
918926
size_t required_blocks_count(SequenceGroup::CPtr seq_group) {
927+
std::lock_guard<std::mutex> lock(m_cached_blocks_map_mutex);
919928
std::vector<Sequence::CPtr> running_sequences = seq_group->get_running_sequences();
920929
size_t blocks_count = 0; // total number of needed blocks for sequence group
921930
std::set<size_t> last_block_ids; // unique last block indices
@@ -973,6 +982,7 @@ class BlockManager {
973982
* @param seq_group Pointer to a sequence group.
974983
*/
975984
void free_empty_physical_blocks(SequenceGroup::Ptr seq_group) {
985+
std::lock_guard<std::mutex> lock(m_cached_blocks_map_mutex);
976986
size_t num_logical_blocks = seq_group->get_num_logical_blocks();
977987
if (num_logical_blocks == 0) {
978988
return;
@@ -997,6 +1007,7 @@ class BlockManager {
9971007
* indices into which the source block contents should be copied into separately.
9981008
*/
9991009
std::map<size_t, std::list<size_t>> append_slots(SequenceGroup::Ptr seq_group) {
1010+
std::lock_guard<std::mutex> lock(m_cached_blocks_map_mutex);
10001011
// Will always allocate the identical number of new blocks (if any) to each of the "layers" to keep the
10011012
// number of blocks occupied by each "layer" identical at all times.
10021013
size_t num_logical_blocks = seq_group->get_num_logical_blocks();

src/cpp/src/icontinuous_batching.cpp

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,12 @@ ContinuousBatchingPipeline::IContinuousBatchingPipeline::add_request(uint64_t re
214214
GenerationConfig sampling_params) {
215215
OPENVINO_ASSERT(m_model_input_type == ModelInputType::EMBEDDINGS, "Model doesn't support embeddings.");
216216
ov::genai::VLMPerfMetrics metrics;
217-
m_inputs_embedder->set_apply_chat_template_status(sampling_params.apply_chat_template);
218-
ov::Tensor inputs = m_inputs_embedder->get_inputs_embeds(prompt, rgbs, metrics);
217+
ov::Tensor inputs;
218+
{
219+
std::lock_guard<std::mutex> lock(m_embeddings_mutex);
220+
m_inputs_embedder->set_apply_chat_template_status(sampling_params.apply_chat_template);
221+
inputs = m_inputs_embedder->get_inputs_embeds(prompt, rgbs, metrics);
222+
}
219223
return add_request(request_id, inputs, sampling_params);
220224
}
221225

src/cpp/src/icontinuous_batching.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ class ContinuousBatchingPipeline::IContinuousBatchingPipeline {
5757

5858
ModelInputType m_model_input_type = ModelInputType::TOKENS;
5959
std::shared_ptr<InputsEmbedder> m_inputs_embedder;
60+
std::mutex m_embeddings_mutex;
6061

6162
void stream_tokens(const std::shared_ptr<ThreadedStreamerWrapper>& streamer_ptr, const GenerationHandle& handle);
6263
public:

src/cpp/src/visual_language/internvl_chat/classes.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -256,7 +256,9 @@ ov::Tensor InputsEmbedderInternVLChat::get_inputs_embeds(const std::string& prom
256256
ov::Tensor text_embeds = m_embedding->infer(input_ids);
257257

258258
if (images.empty()) {
259-
return text_embeds;
259+
ov::Tensor inputs_embeds(text_embeds.get_element_type(), text_embeds.get_shape());
260+
std::memcpy(inputs_embeds.data(), text_embeds.data(), text_embeds.get_byte_size());
261+
return inputs_embeds;
260262
}
261263
auto start_tokenizer_time = std::chrono::steady_clock::now();
262264
ov::Tensor encoded_image_context_token = m_tokenizer.encode(image_context_token, ov::genai::add_special_tokens(false)).input_ids;

src/cpp/src/visual_language/llava/classes.cpp

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,9 @@ ov::Tensor InputsEmbedderLLaVA::get_inputs_embeds(const std::string& prompt, con
135135
ov::Tensor text_embeds = m_embedding->infer(input_ids);
136136

137137
if (images.empty()) {
138-
return text_embeds;
138+
ov::Tensor inputs_embeds(text_embeds.get_element_type(), text_embeds.get_shape());
139+
std::memcpy(inputs_embeds.data(), text_embeds.data(), text_embeds.get_byte_size());
140+
return inputs_embeds;
139141
}
140142
auto start_tokenizer_time = std::chrono::steady_clock::now();
141143
ov::Tensor encoded_image_token = m_tokenizer.encode(m_vlm_config.im_start, ov::genai::add_special_tokens(false)).input_ids;
@@ -187,7 +189,11 @@ ov::Tensor InputsEmbedderLLaVA::merge_text_and_image_embeddings_llava(
187189
);
188190
token_offset -= n_tokens + 1;
189191
}
190-
return text_embeds;
192+
// text_embeds is bound to infer request that can be used by another thread after leaving embeddings calculation scope
193+
// so we need to return a copy to make sure data does not get corrupted
194+
ov::Tensor inputs_embeds(text_embeds.get_element_type(), text_embeds.get_shape());
195+
std::memcpy(inputs_embeds.data(), text_embeds.data(), text_embeds.get_byte_size());
196+
return inputs_embeds;
191197
}
192198

193199
} // namespace ov::genai

src/cpp/src/visual_language/llava_next/classes.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -373,7 +373,9 @@ ov::Tensor InputsEmbedderLLaVANext::get_inputs_embeds(const std::string& prompt,
373373
ov::Tensor text_embeds = m_embedding->infer(input_ids);
374374

375375
if (images.empty()) {
376-
return text_embeds;
376+
ov::Tensor inputs_embeds(text_embeds.get_element_type(), text_embeds.get_shape());
377+
std::memcpy(inputs_embeds.data(), text_embeds.data(), text_embeds.get_byte_size());
378+
return inputs_embeds;
377379
}
378380
auto start_tokenizer_time = std::chrono::steady_clock::now();
379381
ov::Tensor encoded_image_token = m_tokenizer.encode(m_vlm_config.im_start, ov::genai::add_special_tokens(false)).input_ids;

src/cpp/src/visual_language/minicpm/classes.cpp

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -679,7 +679,11 @@ ov::Tensor InputsEmbedderMiniCPM::get_inputs_embeds(const std::string& prompt, c
679679
m_image_id = 0;
680680
m_prev_image_id = 0;
681681
}
682-
return inputs_embeds;
682+
// inputs_embeds is bound to infer request that can be used by another thread after leaving this scope
683+
// so we need to return a copy to make sure data does not get corrupted
684+
ov::Tensor inputs_embeds_copy(inputs_embeds.get_element_type(), inputs_embeds.get_shape());
685+
std::memcpy(inputs_embeds_copy.data(), inputs_embeds.data(), inputs_embeds.get_byte_size());
686+
return inputs_embeds_copy;
683687
}
684688

685689
void InputsEmbedderMiniCPM::update_chat_history(const std::string& decoded_results, const ov::genai::GenerationStatus generation_finish_status) {

src/cpp/src/visual_language/phi3_vision/classes.cpp

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -623,7 +623,6 @@ ov::Tensor InputsEmbedderPhi3V::get_inputs_embeds(const std::string& prompt, con
623623
if (!m_is_chat_conversation) {
624624
m_tokens_per_images.clear();
625625
}
626-
627626
return inputs_embeds;
628627
}
629628

src/cpp/src/visual_language/qwen2vl/classes.cpp

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,9 @@ ov::Tensor InputsEmbedderQwen2VL::get_inputs_embeds(const std::string& prompt, c
338338
m_image_id = 0;
339339
}
340340
if (images.empty()) {
341-
return text_embeds;
341+
ov::Tensor inputs_embeds(text_embeds.get_element_type(), text_embeds.get_shape());
342+
std::memcpy(inputs_embeds.data(), text_embeds.data(), text_embeds.get_byte_size());
343+
return inputs_embeds;
342344
}
343345

344346
return merge_text_and_image_embeddings_qwen2vl(input_ids, text_embeds, reordered_image_embeds, reordered_images_grid_thw, image_pad_token_id);

0 commit comments

Comments
 (0)