Skip to content

Commit

Permalink
i#6685 core-shard: Add only_shards drmemtrace filter (#6925)
Browse files Browse the repository at this point in the history
Adds a new filter by shard ordinal to the drmemtrace scheduler and
analyzers, and adds multi-thread filtering on the command line.

Adds input_workload_t.only_shards to filter scheduler inputs by ordinal,
which is mostly useful for core-sharded-on-disk traces.

Adds new CLI options -only_shards, which takes a comma-separated list of
ordinals, and -only_threads, which takes a comma-separated list of tids.

Adds sorting of file names when reading inputs from a directory so that
ordinals are reliable. Adds leading 0's to record_filter's output name
so this lexicographic sort keeps cores in order.

Adds error checking that any tid or ordinal filter is actually present
in the inputs.

Adds an internal struct input_read_info_t to implement the new filtering
inside the scheduler.

Adds scheduler unit tests of the new filters.

Adds an end-to-end test of -only_shards on a core-sharded-on-disk trace.

Also tested end-to-end manually with invalid tids, invalid ordinals,
valid tids for core-sharded-on-disk, and valid ordinals for
core-sharded-on-disk.

Issue: #6685
  • Loading branch information
derekbruening authored Aug 22, 2024
1 parent d26a744 commit 068213d
Show file tree
Hide file tree
Showing 12 changed files with 346 additions and 67 deletions.
11 changes: 6 additions & 5 deletions clients/drcachesim/analyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include <iostream>
#include <limits>
#include <memory>
#include <set>
#include <string>
#include <thread>
#include <utility>
Expand Down Expand Up @@ -228,7 +229,8 @@ analyzer_tmpl_t<RecordType, ReaderType>::analyzer_tmpl_t()
template <typename RecordType, typename ReaderType>
bool
analyzer_tmpl_t<RecordType, ReaderType>::init_scheduler(
const std::string &trace_path, memref_tid_t only_thread, int verbosity,
const std::string &trace_path, const std::set<memref_tid_t> &only_threads,
const std::set<int> &only_shards, int verbosity,
typename sched_type_t::scheduler_options_t options)
{
verbosity_ = verbosity;
Expand All @@ -245,9 +247,8 @@ analyzer_tmpl_t<RecordType, ReaderType>::init_scheduler(
regions.emplace_back(skip_instrs_ + 1, 0);
}
typename sched_type_t::input_workload_t workload(trace_path, regions);
if (only_thread != INVALID_THREAD_ID) {
workload.only_threads.insert(only_thread);
}
workload.only_threads = only_threads;
workload.only_shards = only_shards;
if (regions.empty() && skip_to_timestamp_ > 0) {
workload.times_of_interest.emplace_back(skip_to_timestamp_, 0);
}
Expand Down Expand Up @@ -369,7 +370,7 @@ analyzer_tmpl_t<RecordType, ReaderType>::analyzer_tmpl_t(
// The scheduler will call reader_t::init() for each input file. We assume
// that won't block (analyzer_multi_t separates out IPC readers).
typename sched_type_t::scheduler_options_t sched_ops;
if (!init_scheduler(trace_path, INVALID_THREAD_ID, verbosity, std::move(sched_ops))) {
if (!init_scheduler(trace_path, {}, {}, verbosity, std::move(sched_ops))) {
success_ = false;
error_string_ = "Failed to create scheduler";
return;
Expand Down
7 changes: 5 additions & 2 deletions clients/drcachesim/analyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@

#include <iterator>
#include <memory>
#include <set>
#include <string>
#include <unordered_map>
#include <utility>
Expand Down Expand Up @@ -216,9 +217,11 @@ template <typename RecordType, typename ReaderType> class analyzer_tmpl_t {
operator=(const analyzer_worker_data_t &) = delete;
};

// Pass INVALID_THREAD_ID for only_thread to include all threads.
bool
init_scheduler(const std::string &trace_path, memref_tid_t only_thread, int verbosity,
init_scheduler(const std::string &trace_path,
// To include all threads/shards, use empty sets.
const std::set<memref_tid_t> &only_threads,
const std::set<int> &only_shards, int verbosity,
typename sched_type_t::scheduler_options_t options);

// For core-sharded, worker_count_ must be set prior to calling this; for parallel
Expand Down
44 changes: 40 additions & 4 deletions clients/drcachesim/analyzer_multi.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,34 @@ record_analyzer_multi_t::create_analysis_tool_from_options(const std::string &to
* Other analyzer_multi_tmpl_t routines that do not need to be specialized.
*/

template <typename RecordType, typename ReaderType>
std::string
analyzer_multi_tmpl_t<RecordType, ReaderType>::set_input_limit(
std::set<memref_tid_t> &only_threads, std::set<int> &only_shards)
{
bool valid_limit = true;
if (op_only_thread.get_value() != 0) {
if (!op_only_threads.get_value().empty() || !op_only_shards.get_value().empty())
valid_limit = false;
only_threads.insert(op_only_thread.get_value());
} else if (!op_only_threads.get_value().empty()) {
if (!op_only_shards.get_value().empty())
valid_limit = false;
std::vector<std::string> tids = split_by(op_only_threads.get_value(), ",");
for (const std::string &tid : tids) {
only_threads.insert(strtol(tid.c_str(), nullptr, 10));
}
} else if (!op_only_shards.get_value().empty()) {
std::vector<std::string> tids = split_by(op_only_shards.get_value(), ",");
for (const std::string &tid : tids) {
only_shards.insert(strtol(tid.c_str(), nullptr, 10));
}
}
if (!valid_limit)
return "Only one of -only_thread, -only_threads, and -only_shards can be set.";
return "";
}

template <typename RecordType, typename ReaderType>
analyzer_multi_tmpl_t<RecordType, ReaderType>::analyzer_multi_tmpl_t()
{
Expand Down Expand Up @@ -457,7 +485,16 @@ analyzer_multi_tmpl_t<RecordType, ReaderType>::analyzer_multi_tmpl_t()
if (!op_indir.get_value().empty()) {
std::string tracedir =
raw2trace_directory_t::tracedir_from_rawdir(op_indir.get_value());
if (!this->init_scheduler(tracedir, op_only_thread.get_value(),

std::set<memref_tid_t> only_threads;
std::set<int> only_shards;
std::string res = set_input_limit(only_threads, only_shards);
if (!res.empty()) {
this->success_ = false;
this->error_string_ = res;
return;
}
if (!this->init_scheduler(tracedir, only_threads, only_shards,
op_verbose.get_value(), std::move(sched_ops)))
this->success_ = false;
} else if (op_infile.get_value().empty()) {
Expand All @@ -479,9 +516,8 @@ analyzer_multi_tmpl_t<RecordType, ReaderType>::analyzer_multi_tmpl_t()
}
} else {
// Legacy file.
if (!this->init_scheduler(op_infile.get_value(),
INVALID_THREAD_ID /*all threads*/,
op_verbose.get_value(), std::move(sched_ops)))
if (!this->init_scheduler(op_infile.get_value(), {}, {}, op_verbose.get_value(),
std::move(sched_ops)))
this->success_ = false;
}
if (!init_analysis_tools()) {
Expand Down
3 changes: 3 additions & 0 deletions clients/drcachesim/analyzer_multi.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ class analyzer_multi_tmpl_t : public analyzer_tmpl_t<RecordType, ReaderType> {
cache_simulator_knobs_t *
get_cache_simulator_knobs();

std::string
set_input_limit(std::set<memref_tid_t> &only_threads, std::set<int> &only_shards);

std::unique_ptr<std::istream> serial_schedule_file_;
// This is read in a single stream by invariant_checker and so is not
// an archive_istream_t.
Expand Down
20 changes: 19 additions & 1 deletion clients/drcachesim/common/options.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,25 @@ droption_t<int>
op_only_thread(DROPTION_SCOPE_FRONTEND, "only_thread", 0,
"Only analyze this thread (0 means all)",
"Limits analyis to the single "
"thread with the given identifier. 0 enables all threads.");
"thread with the given identifier. 0 enables all threads. "
"Applies only to -indir, not to -infile. "
"Cannot be combined with -only_threads or -only_shards.");

droption_t<std::string>
op_only_threads(DROPTION_SCOPE_FRONTEND, "only_threads", "",
"Only analyze these comma-separated threads",
"Limits analyis to the list of comma-separated thread ids. "
"Applies only to -indir, not to -infile. "
"Cannot be combined with -only_thread or -only_shards.");
droption_t<std::string>
op_only_shards(DROPTION_SCOPE_FRONTEND, "only_shards", "",
"Only analyze these comma-separated shard ordinals",
"Limits analyis to the list of comma-separated shard ordinals. "
"A shard is typically an input thread but might be a core for "
"core-sharded-on-disk traces. The ordinal is 0-based and indexes "
"into the sorted order of input filenames. "
"Applies only to -indir, not to -infile. "
"Cannot be combined with -only_thread or -only_threads.");

droption_t<bytesize_t> op_skip_instrs(
DROPTION_SCOPE_FRONTEND, "skip_instrs", 0, "Number of instructions to skip",
Expand Down
2 changes: 2 additions & 0 deletions clients/drcachesim/common/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,8 @@ extern dynamorio::droption::droption_t<dynamorio::droption::bytesize_t>
extern dynamorio::droption::droption_t<dynamorio::droption::bytesize_t>
op_interval_instr_count;
extern dynamorio::droption::droption_t<int> op_only_thread;
extern dynamorio::droption::droption_t<std::string> op_only_threads;
extern dynamorio::droption::droption_t<std::string> op_only_shards;
extern dynamorio::droption::droption_t<dynamorio::droption::bytesize_t> op_skip_instrs;
extern dynamorio::droption::droption_t<dynamorio::droption::bytesize_t> op_skip_refs;
extern dynamorio::droption::droption_t<uint64_t> op_skip_to_timestamp;
Expand Down
106 changes: 83 additions & 23 deletions clients/drcachesim/scheduler/scheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -664,6 +664,33 @@ scheduler_tmpl_t<RecordType, ReaderType>::stream_t::set_active(bool active)
* Scheduler.
*/

template <typename RecordType, typename ReaderType>
bool
scheduler_tmpl_t<RecordType, ReaderType>::check_valid_input_limits(
const input_workload_t &workload, input_reader_info_t &reader_info)
{
if (!workload.only_shards.empty()) {
for (input_ordinal_t ord : workload.only_shards) {
if (ord < 0 || ord >= static_cast<input_ordinal_t>(reader_info.input_count)) {
error_string_ = "only_shards entry " + std::to_string(ord) +
" out of bounds for a shard ordinal";
return false;
}
}
}
if (!workload.only_threads.empty()) {
for (memref_tid_t tid : workload.only_threads) {
if (reader_info.unfiltered_tids.find(tid) ==
reader_info.unfiltered_tids.end()) {
error_string_ = "only_threads entry " + std::to_string(tid) +
" not found in workload inputs";
return false;
}
}
}
return true;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::scheduler_status_t
scheduler_tmpl_t<RecordType, ReaderType>::init(
Expand All @@ -679,16 +706,26 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
auto &workload = workload_inputs[workload_idx];
if (workload.struct_size != sizeof(input_workload_t))
return STATUS_ERROR_INVALID_PARAMETER;
std::unordered_map<memref_tid_t, int> workload_tids;
if (!workload.only_threads.empty() && !workload.only_shards.empty())
return STATUS_ERROR_INVALID_PARAMETER;
input_reader_info_t reader_info;
reader_info.only_threads = workload.only_threads;
reader_info.only_shards = workload.only_shards;
if (workload.path.empty()) {
if (workload.readers.empty())
return STATUS_ERROR_INVALID_PARAMETER;
for (auto &reader : workload.readers) {
reader_info.input_count = workload.readers.size();
for (int i = 0; i < static_cast<int>(workload.readers.size()); ++i) {
auto &reader = workload.readers[i];
if (!reader.reader || !reader.end)
return STATUS_ERROR_INVALID_PARAMETER;
reader_info.unfiltered_tids.insert(reader.tid);
if (!workload.only_threads.empty() &&
workload.only_threads.find(reader.tid) == workload.only_threads.end())
continue;
if (!workload.only_shards.empty() &&
workload.only_shards.find(i) == workload.only_shards.end())
continue;
int index = static_cast<input_ordinal_t>(inputs_.size());
inputs_.emplace_back();
input_info_t &input = inputs_.back();
Expand All @@ -699,22 +736,24 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
input.reader = std::move(reader.reader);
input.reader_end = std::move(reader.end);
input.needs_init = true;
workload_tids[input.tid] = input.index;
reader_info.tid2input[input.tid] = input.index;
tid2input_[workload_tid_t(workload_idx, input.tid)] = index;
}
} else {
if (!workload.readers.empty())
return STATUS_ERROR_INVALID_PARAMETER;
sched_type_t::scheduler_status_t res =
open_readers(workload.path, workload.only_threads, workload_tids);
open_readers(workload.path, reader_info);
if (res != STATUS_SUCCESS)
return res;
for (const auto &it : workload_tids) {
for (const auto &it : reader_info.tid2input) {
inputs_[it.second].workload = workload_idx;
workload2inputs[workload_idx].push_back(it.second);
tid2input_[workload_tid_t(workload_idx, it.first)] = it.second;
}
}
if (!check_valid_input_limits(workload, reader_info))
return STATUS_ERROR_INVALID_PARAMETER;
if (!workload.times_of_interest.empty()) {
for (const auto &modifiers : workload.thread_modifiers) {
if (!modifiers.regions_of_interest.empty()) {
Expand All @@ -723,7 +762,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
}
}
sched_type_t::scheduler_status_t status =
create_regions_from_times(workload_tids, workload);
create_regions_from_times(reader_info.tid2input, workload);
if (status != sched_type_t::STATUS_SUCCESS)
return STATUS_ERROR_INVALID_PARAMETER;
}
Expand All @@ -734,7 +773,7 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
std::vector<memref_tid_t> workload_tid_vector;
if (modifiers.tids.empty()) {
// Apply to all tids that have not already been modified.
for (const auto entry : workload_tids) {
for (const auto entry : reader_info.tid2input) {
if (!inputs_[entry.second].has_modifier)
workload_tid_vector.push_back(entry.first);
}
Expand All @@ -744,9 +783,9 @@ scheduler_tmpl_t<RecordType, ReaderType>::init(
// We assume the overhead of copying the modifiers for every thread is
// not high and the simplified code is worthwhile.
for (memref_tid_t tid : *which_tids) {
if (workload_tids.find(tid) == workload_tids.end())
if (reader_info.tid2input.find(tid) == reader_info.tid2input.end())
return STATUS_ERROR_INVALID_PARAMETER;
int index = workload_tids[tid];
int index = reader_info.tid2input[tid];
input_info_t &input = inputs_[index];
input.has_modifier = true;
input.binding = modifiers.output_binding;
Expand Down Expand Up @@ -1788,9 +1827,9 @@ scheduler_tmpl_t<RecordType, ReaderType>::get_initial_input_content(

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::scheduler_status_t
scheduler_tmpl_t<RecordType, ReaderType>::open_reader(
const std::string &path, const std::set<memref_tid_t> &only_threads,
std::unordered_map<memref_tid_t, int> &workload_tids)
scheduler_tmpl_t<RecordType, ReaderType>::open_reader(const std::string &path,
input_ordinal_t input_ordinal,
input_reader_info_t &reader_info)
{
if (path.empty() || directory_iterator_t::is_directory(path))
return STATUS_ERROR_INVALID_PARAMETER;
Expand All @@ -1803,10 +1842,13 @@ scheduler_tmpl_t<RecordType, ReaderType>::open_reader(
inputs_.emplace_back();
input_info_t &input = inputs_.back();
input.index = index;
// We need the tid up front. Rather than assume it's still part of the filename, we
// read the first record (we generalize to read until we find the first but we
// We need the tid up front. Rather than assume it's still part of the filename,
// we read the first record (we generalize to read until we find the first but we
// expect it to be the first after PR #5739 changed the order file_reader_t passes
// them to reader_t) to find it.
// XXX: For core-sharded-on-disk traces, this tid is just the first one for
// this core; it would be better to read the filetype and not match any tid
// for such files? Should we call get_initial_input_content() to do that?
std::unique_ptr<ReaderType> reader_end = get_default_reader();
memref_tid_t tid = INVALID_THREAD_ID;
while (*reader != *reader_end) {
Expand All @@ -1820,32 +1862,44 @@ scheduler_tmpl_t<RecordType, ReaderType>::open_reader(
error_string_ = "Failed to read " + path;
return STATUS_ERROR_FILE_READ_FAILED;
}
if (!only_threads.empty() && only_threads.find(tid) == only_threads.end()) {
// For core-sharded inputs that start idle the tid might be IDLE_THREAD_ID.
// That means the size of unfiltered_tids will not be the total input
// size, which is why we have a separate input_count.
reader_info.unfiltered_tids.insert(tid);
++reader_info.input_count;
if (!reader_info.only_threads.empty() &&
reader_info.only_threads.find(tid) == reader_info.only_threads.end()) {
inputs_.pop_back();
return sched_type_t::STATUS_SUCCESS;
}
if (!reader_info.only_shards.empty() &&
reader_info.only_shards.find(input_ordinal) == reader_info.only_shards.end()) {
inputs_.pop_back();
return sched_type_t::STATUS_SUCCESS;
}
VPRINT(this, 1, "Opened reader for tid %" PRId64 " %s\n", tid, path.c_str());
input.tid = tid;
input.reader = std::move(reader);
input.reader_end = std::move(reader_end);
workload_tids[tid] = index;
reader_info.tid2input[tid] = index;
return sched_type_t::STATUS_SUCCESS;
}

template <typename RecordType, typename ReaderType>
typename scheduler_tmpl_t<RecordType, ReaderType>::scheduler_status_t
scheduler_tmpl_t<RecordType, ReaderType>::open_readers(
const std::string &path, const std::set<memref_tid_t> &only_threads,
std::unordered_map<memref_tid_t, int> &workload_tids)
scheduler_tmpl_t<RecordType, ReaderType>::open_readers(const std::string &path,
input_reader_info_t &reader_info)
{
if (!directory_iterator_t::is_directory(path))
return open_reader(path, only_threads, workload_tids);
if (!directory_iterator_t::is_directory(path)) {
return open_reader(path, 0, reader_info);
}
directory_iterator_t end;
directory_iterator_t iter(path);
if (!iter) {
error_string_ = "Failed to list directory " + path + ": " + iter.error_string();
return sched_type_t::STATUS_ERROR_FILE_OPEN_FAILED;
}
std::vector<std::string> files;
for (; iter != end; ++iter) {
const std::string fname = *iter;
if (fname == "." || fname == ".." ||
Expand All @@ -1858,8 +1912,14 @@ scheduler_tmpl_t<RecordType, ReaderType>::open_readers(
fname == DRMEMTRACE_ENCODING_FILENAME)
continue;
const std::string file = path + DIRSEP + fname;
sched_type_t::scheduler_status_t res =
open_reader(file, only_threads, workload_tids);
files.push_back(file);
}
// Sort so we can have reliable shard ordinals for only_shards.
// We assume leading 0's are used for important numbers embedded in the path,
// so that a regular sort keeps numeric order.
std::sort(files.begin(), files.end());
for (int i = 0; i < static_cast<int>(files.size()); ++i) {
sched_type_t::scheduler_status_t res = open_reader(files[i], i, reader_info);
if (res != sched_type_t::STATUS_SUCCESS)
return res;
}
Expand Down
Loading

0 comments on commit 068213d

Please sign in to comment.