Skip to content

Commit

Permalink
Initial commit for using global command queue
Browse files Browse the repository at this point in the history
Signed-off-by: Bogdan Pereanu <[email protected]>
  • Loading branch information
pereanub committed Jan 24, 2025
1 parent 174869c commit 20fd791
Show file tree
Hide file tree
Showing 11 changed files with 242 additions and 89 deletions.
14 changes: 12 additions & 2 deletions src/plugins/intel_npu/src/backend/include/zero_pipeline.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct Pipeline {

Pipeline(const Pipeline&) = delete;
Pipeline& operator=(const Pipeline&) = delete;
virtual ~Pipeline() = default;
~Pipeline();

void push();
void pull();
Expand All @@ -40,6 +40,7 @@ struct Pipeline {
void closeCommandListIndex(size_t command_list_index);

protected:
std::shared_ptr<ZeroInitStructsHolder> _init_structs;
std::shared_ptr<IGraph> _graph;
const Config _config;
const uint32_t _id;
Expand All @@ -59,9 +60,18 @@ struct Pipeline {
std::vector<std::unique_ptr<Fence>> _fences;
std::shared_ptr<EventPool> _event_pool;
std::vector<std::shared_ptr<Event>> _events;
bool sync_output_with_fences_ = true;
bool _sync_output_with_fences = true;
std::shared_ptr<zeroProfiling::NpuInferProfiling> _npu_profiling;
Logger _logger;

CommandQueueFactory _command_queue_factory;
uint32_t _group_ordinal;
bool _fences_are_created = false;
std::mutex _mutex;

bool _turbo = false;
ze_command_queue_priority_t _ze_queue_priority;
std::optional<ze_command_queue_workload_type_t> _ze_workload_type = std::nullopt;
};

} // namespace intel_npu
68 changes: 56 additions & 12 deletions src/plugins/intel_npu/src/backend/src/zero_pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ Pipeline::Pipeline(const Config& config,
const std::vector<std::vector<std::shared_ptr<ov::ITensor>>>& input_tensors,
const std::vector<std::shared_ptr<ov::ITensor>>& output_tensors,
uint32_t group_ordinal)
: _graph(graph),
: _init_structs(init_structs),
_graph(graph),
_config(config),
_id(_graph->get_unique_id()),
_number_of_command_lists(_graph->get_batch_size().has_value() ? *_graph->get_batch_size() : 1),
Expand All @@ -35,25 +36,31 @@ Pipeline::Pipeline(const Config& config,
init_structs->getContext(),
_number_of_command_lists ? static_cast<uint32_t>(_number_of_command_lists) : 1)},
_npu_profiling(npu_profiling),
_logger("Pipeline", _config.get<LOG_LEVEL>()) {
_logger("Pipeline", _config.get<LOG_LEVEL>()),
_group_ordinal(group_ordinal) {
OV_ITT_SCOPED_TASK(itt::domains::LevelZeroBackend, "Zero_infer_request::Pipeline::Pipeline");
_logger.debug("Pipeline - initialize started");

if (profiling_pool.create()) {
profiling_query.create(profiling_pool._handle);
}

if (_config.has<TURBO>()) {
_turbo = _config.get<TURBO>();
}

_ze_queue_priority = zeroUtils::toZeQueuePriority(_config.get<MODEL_PRIORITY>());

_command_lists.reserve(_number_of_command_lists);
_events.reserve(_number_of_command_lists);
_fences.reserve(_number_of_command_lists);
_fences.resize(_number_of_command_lists);
_logger.debug("Pipeline - emplace_back _event_pool and _command_queue");
for (size_t i = 0; i < _number_of_command_lists; i++) {
_command_lists.emplace_back(
std::make_unique<CommandList>(init_structs,
group_ordinal,
init_structs->getMutableCommandListVersion() ? true : false));
_events.emplace_back(std::make_shared<Event>(_event_pool, static_cast<uint32_t>(i)));
_fences.emplace_back(std::make_unique<Fence>(*_graph->get_command_queue()));
}

for (size_t i = 0; i < _number_of_command_lists; i++) {
Expand Down Expand Up @@ -138,7 +145,7 @@ Pipeline::Pipeline(const Config& config,
}

// appendBarrier used in L0 as well
if (!sync_output_with_fences_) {
if (!_sync_output_with_fences) {
_command_lists.at(i)->appendBarrier();
_events.at(i)->AppendSignalEvent(*_command_lists.at(i));
}
Expand All @@ -150,6 +157,38 @@ Pipeline::Pipeline(const Config& config,
void Pipeline::push() {
_logger.debug("Pipeline - push() started");

{
std::lock_guard<std::mutex> lock(_mutex);

_command_queue = _command_queue_factory.getCommandQueue(_init_structs,
_ze_queue_priority,
_graph->get_ze_workload_type(),
_group_ordinal,
_turbo);

if (_ze_workload_type != _graph->get_ze_workload_type()) {
_command_queue_factory.freeCommandQueue(_ze_queue_priority, _ze_workload_type, _turbo);

_ze_workload_type = _graph->get_ze_workload_type();

if (_sync_output_with_fences) {
for (size_t i = 0; i < _number_of_command_lists; i++) {
_fences[i] = std::make_unique<Fence>(*_command_queue);
}
}
}

if (!_fences_are_created) {
if (_sync_output_with_fences) {
for (size_t i = 0; i < _number_of_command_lists; i++) {
_fences[i] = std::make_unique<Fence>(*_command_queue);
}
}

_fences_are_created = true;
}
}

if (_config.get<RUN_INFERENCES_SEQUENTIALLY>()) {
if (_id) {
auto previousIndex = _graph->get_last_submitted_id();
Expand All @@ -164,10 +203,10 @@ void Pipeline::push() {

for (size_t i = 0; i < _command_lists.size(); ++i) {
OV_ITT_TASK_CHAIN(ZERO_PIPELINE_IP_PUSH, itt::domains::LevelZeroBackend, "Pipeline", "push");
if (sync_output_with_fences_) {
_graph->get_command_queue()->executeCommandList(*_command_lists.at(i), *_fences.at(i));
if (_sync_output_with_fences) {
_command_queue->executeCommandList(*_command_lists.at(i), *_fences.at(i));
} else {
_graph->get_command_queue()->executeCommandList(*_command_lists.at(i));
_command_queue->executeCommandList(*_command_lists.at(i));
}
}

Expand All @@ -179,7 +218,7 @@ void Pipeline::pull() {
OV_ITT_TASK_CHAIN(ZERO_PIPELINE_IP_PULL, itt::domains::LevelZeroBackend, "Pipeline", "pull");

for (size_t i = 0; i < _command_lists.size(); ++i) {
if (sync_output_with_fences_) {
if (_sync_output_with_fences) {
_fences.at(i)->hostSynchronize();
} else {
_events.at(i)->hostSynchronize();
Expand All @@ -194,17 +233,17 @@ void Pipeline::pull() {
};

void Pipeline::reset() const {
_logger.debug("Pipeline - rest() started");
_logger.debug("Pipeline - reset() started");

for (size_t i = 0; i < _command_lists.size(); ++i) {
if (sync_output_with_fences_) {
if (_sync_output_with_fences) {
_fences.at(i)->reset();
} else {
_events.at(i)->reset();
}
}

_logger.debug("Pipeline - rest() completed");
_logger.debug("Pipeline - reset() completed");
};

void Pipeline::updateCommandList(uint32_t arg_index, const void* arg_data, size_t byte_size) {
Expand Down Expand Up @@ -257,4 +296,9 @@ void Pipeline::closeCommandListIndex(size_t command_list_index) {
_command_lists.at(command_list_index)->close();
};

Pipeline::~Pipeline() {
_command_queue.reset();
_command_queue_factory.freeCommandQueue(_ze_queue_priority, _ze_workload_type, _turbo);
}

} // namespace intel_npu
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,8 @@ class IGraph : public std::enable_shared_from_this<IGraph> {

const std::vector<ArgumentDescriptor>& get_input_descriptors() const;
const std::vector<ArgumentDescriptor>& get_output_descriptors() const;
const std::shared_ptr<CommandQueue>& get_command_queue() const;

void set_workload_type(const ov::WorkloadType workloadType) const;
void set_workload_type(const ov::WorkloadType workloadType);

std::mutex& get_mutex();

Expand All @@ -55,6 +54,7 @@ class IGraph : public std::enable_shared_from_this<IGraph> {
uint32_t get_last_submitted_id() const;

const std::optional<std::size_t> get_batch_size() const;
const std::optional<ze_command_queue_workload_type_t> get_ze_workload_type() const;

protected:
/**
Expand Down Expand Up @@ -82,7 +82,6 @@ class IGraph : public std::enable_shared_from_this<IGraph> {
std::vector<ArgumentDescriptor> _input_descriptors;
std::vector<ArgumentDescriptor> _output_descriptors;

std::shared_ptr<CommandQueue> _command_queue;
std::vector<std::shared_ptr<Event>> _last_submitted_event;

// Used to protect zero pipeline creation in the graph. The pipeline should be created only once per graph when the
Expand All @@ -100,6 +99,8 @@ class IGraph : public std::enable_shared_from_this<IGraph> {
*/
std::optional<std::size_t> _batch_size = std::nullopt;

std::optional<ze_command_queue_workload_type_t> _ze_workload_type = std::nullopt;

Logger _logger;
};

Expand Down
32 changes: 10 additions & 22 deletions src/plugins/intel_npu/src/common/src/igraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ IGraph::IGraph(ze_graph_handle_t handle,
if (blob.has_value()) {
_blob = std::move(*blob);
}

if (config.has<WORKLOAD_TYPE>()) {
set_workload_type(config.get<WORKLOAD_TYPE>());
}
}

const NetworkMetadata& IGraph::get_metadata() const {
Expand All @@ -46,28 +50,8 @@ const std::vector<ArgumentDescriptor>& IGraph::get_output_descriptors() const {
return _output_descriptors;
}

const std::shared_ptr<CommandQueue>& IGraph::get_command_queue() const {
return _command_queue;
}

void IGraph::set_workload_type(const ov::WorkloadType workloadType) const {
if (_command_queue == nullptr) {
return;
}

ze_command_queue_workload_type_t zeWorkloadType;
switch (workloadType) {
case ov::WorkloadType::DEFAULT:
zeWorkloadType = ze_command_queue_workload_type_t::ZE_WORKLOAD_TYPE_DEFAULT;
break;
case ov::WorkloadType::EFFICIENT:
zeWorkloadType = ze_command_queue_workload_type_t::ZE_WORKLOAD_TYPE_BACKGROUND;
break;
default:
OPENVINO_THROW("Unknown value for WorkloadType!");
}

_command_queue->setWorkloadType(zeWorkloadType);
void IGraph::set_workload_type(const ov::WorkloadType workloadType) {
_ze_workload_type = zeroUtils::toZeQueueWorkloadType(workloadType);
}

std::mutex& IGraph::get_mutex() {
Expand Down Expand Up @@ -156,4 +140,8 @@ const std::optional<std::size_t> IGraph::get_batch_size() const {
return _batch_size;
}

const std::optional<ze_command_queue_workload_type_t> IGraph::get_ze_workload_type() const {
return _ze_workload_type;
}

} // namespace intel_npu
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ class ZeGraphExtWrappers {

void setGraphArgumentValue(ze_graph_handle_t graphHandle, uint32_t argi_, const void* argv) const;

void initializeGraph(ze_graph_handle_t graphHandle, const Config& config) const;
void initializeGraph(ze_graph_handle_t graphHandle) const;

private:
std::unordered_set<std::string> getQueryResultFromSupportedLayers(
Expand All @@ -60,7 +60,7 @@ class ZeGraphExtWrappers {
std::vector<IODescriptor>& inputs,
std::vector<IODescriptor>& outputs) const;

void initialize_graph_through_command_list(ze_graph_handle_t graphHandle, const Config& config) const;
void initialize_graph_through_command_list(ze_graph_handle_t graphHandle) const;

std::shared_ptr<ZeroInitStructsHolder> _zeroInitStruct;
uint32_t _graphExtVersion;
Expand Down
17 changes: 1 addition & 16 deletions src/plugins/intel_npu/src/compiler_adapter/src/driver_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,23 +103,8 @@ void DriverGraph::initialize(const Config& config) {
deviceProperties.stype = ZE_STRUCTURE_TYPE_DEVICE_PROPERTIES;
THROW_ON_FAIL_FOR_LEVELZERO("zeDeviceGetProperties",
zeDeviceGetProperties(_zeroInitStruct->getDevice(), &deviceProperties));
auto groupOrdinal = zeroUtils::findGroupOrdinal(_zeroInitStruct->getDevice(), deviceProperties);

bool turbo = false;
if (config.has<TURBO>()) {
turbo = config.get<TURBO>();
}

_command_queue = std::make_shared<CommandQueue>(_zeroInitStruct,
zeroUtils::toZeQueuePriority(config.get<MODEL_PRIORITY>()),
groupOrdinal,
turbo);

if (config.has<WORKLOAD_TYPE>()) {
set_workload_type(config.get<WORKLOAD_TYPE>());
}

_zeGraphExt->initializeGraph(_handle, config);
_zeGraphExt->initializeGraph(_handle);

_logger.debug("Graph initialize finish");

Expand Down
17 changes: 1 addition & 16 deletions src/plugins/intel_npu/src/compiler_adapter/src/plugin_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,23 +98,8 @@ void PluginGraph::initialize(const Config& config) {
deviceProperties.stype = ZE_STRUCTURE_TYPE_DEVICE_PROPERTIES;
THROW_ON_FAIL_FOR_LEVELZERO("zeDeviceGetProperties",
zeDeviceGetProperties(_zeroInitStruct->getDevice(), &deviceProperties));
auto groupOrdinal = zeroUtils::findGroupOrdinal(_zeroInitStruct->getDevice(), deviceProperties);

bool turbo = false;
if (config.has<TURBO>()) {
turbo = config.get<TURBO>();
}

_command_queue = std::make_shared<CommandQueue>(_zeroInitStruct,
zeroUtils::toZeQueuePriority(config.get<MODEL_PRIORITY>()),
groupOrdinal,
turbo);

if (config.has<WORKLOAD_TYPE>()) {
set_workload_type(config.get<WORKLOAD_TYPE>());
}

_zeGraphExt->initializeGraph(_handle, config);
_zeGraphExt->initializeGraph(_handle);

if (config.get<BATCH_MODE>() != ov::intel_npu::BatchMode::COMPILER) {
_batch_size = get_batch_size(_metadata);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,10 @@ void ZeGraphExtWrappers::setGraphArgumentValue(ze_graph_handle_t graphHandle, ui
THROW_ON_FAIL_FOR_LEVELZERO_EXT("zeGraphSetArgumentValue", result, _zeroInitStruct->getGraphDdiTable());
}

void ZeGraphExtWrappers::initializeGraph(ze_graph_handle_t graphHandle, const Config& config) const {
void ZeGraphExtWrappers::initializeGraph(ze_graph_handle_t graphHandle) const {
if (_zeroInitStruct->getGraphDdiTable().version() < ZE_GRAPH_EXT_VERSION_1_8) {
_logger.debug("Use initialize_graph_through_command_list for ext version smaller than 1.8");
initialize_graph_through_command_list(graphHandle, config);
initialize_graph_through_command_list(graphHandle);
} else {
_logger.debug("Initialize graph based on graph properties for ext version larger than 1.8");
ze_graph_properties_2_t properties = {};
Expand All @@ -177,13 +177,12 @@ void ZeGraphExtWrappers::initializeGraph(ze_graph_handle_t graphHandle, const Co
}

if (properties.initStageRequired & ZE_GRAPH_STAGE_COMMAND_LIST_INITIALIZE) {
initialize_graph_through_command_list(graphHandle, config);
initialize_graph_through_command_list(graphHandle);
}
}
}

void ZeGraphExtWrappers::initialize_graph_through_command_list(ze_graph_handle_t graphHandle,
const Config& config) const {
void ZeGraphExtWrappers::initialize_graph_through_command_list(ze_graph_handle_t graphHandle) const {
ze_device_properties_t deviceProperties = {};
deviceProperties.stype = ZE_STRUCTURE_TYPE_DEVICE_PROPERTIES;
THROW_ON_FAIL_FOR_LEVELZERO("zeDeviceGetProperties",
Expand Down
Loading

0 comments on commit 20fd791

Please sign in to comment.