Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion lib/prometheus/client/data_stores/direct_file_store.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
require 'fileutils'
require "cgi"
require 'socket'

module Prometheus
module Client
Expand Down Expand Up @@ -188,7 +189,7 @@ def stores_for_metric
end

def process_id
Process.pid
"#{Socket.gethostname}_#{Process.pid}"
end

def aggregate_values(values)
Expand Down
63 changes: 47 additions & 16 deletions spec/prometheus/client/data_stores/direct_file_store_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,34 @@

context "when process is forked" do
it "opens a new internal store to avoid two processes using the same file" do
allow(Socket).to receive(:gethostname).and_return("host1")
allow(Process).to receive(:pid).and_return(12345)
metric_store = subject.for_metric(:metric_name, metric_type: :counter)
metric_store.increment(labels: {}, by: 1)

allow(Socket).to receive(:gethostname).and_return("host1")
allow(Process).to receive(:pid).and_return(23456)
metric_store.increment(labels: {}, by: 1)
expect(Dir.glob('/tmp/prometheus_test/*').size).to eq(2)
expect(metric_store.all_values).to eq({} => 2.0)
end
end

context "many processes in different hosts share the same pid" do
it "opens a new internal store to avoid two processes using the same file" do
allow(Socket).to receive(:gethostname).and_return("host1")
allow(Process).to receive(:pid).and_return(12345)
metric_store = subject.for_metric(:metric_name, metric_type: :counter)
metric_store.increment(labels: {}, by: 1)

allow(Socket).to receive(:gethostname).and_return("host2")
allow(Process).to receive(:pid).and_return(12345)
metric_store.increment(labels: {}, by: 1)
expect(Dir.glob('/tmp/prometheus_test/*').size).to eq(2)
expect(metric_store.all_values).to eq({} => 2.0)
end
end

it "coalesces values irrespective of the order of labels" do
metric_store1 = subject.for_metric(:metric_name, metric_type: :counter)
metric_store1.increment(labels: { foo: "1", bar: "1" }, by: 1)
Expand All @@ -117,12 +134,14 @@

context "for a non-gauge metric" do
it "sums values from different processes by default" do
allow(Socket).to receive(:gethostname).and_return("host1")
allow(Process).to receive(:pid).and_return(12345)
metric_store1 = subject.for_metric(:metric_name, metric_type: :counter)
metric_store1.set(labels: { foo: "bar" }, val: 1)
metric_store1.set(labels: { foo: "baz" }, val: 7)
metric_store1.set(labels: { foo: "yyy" }, val: 3)

allow(Socket).to receive(:gethostname).and_return("host1")
allow(Process).to receive(:pid).and_return(23456)
metric_store2 = subject.for_metric(:metric_name, metric_type: :counter)
metric_store2.set(labels: { foo: "bar" }, val: 3)
Expand All @@ -143,6 +162,7 @@

context "for a gauge metric" do
it "exposes each process's individual value by default" do
allow(Socket).to receive(:gethostname).and_return("host1")
allow(Process).to receive(:pid).and_return(12345)
metric_store1 = subject.for_metric(
:metric_name,
Expand All @@ -152,6 +172,7 @@
metric_store1.set(labels: { foo: "baz" }, val: 7)
metric_store1.set(labels: { foo: "yyy" }, val: 3)

allow(Socket).to receive(:gethostname).and_return("host1")
allow(Process).to receive(:pid).and_return(23456)
metric_store2 = subject.for_metric(
:metric_name,
Expand All @@ -162,19 +183,20 @@
metric_store2.set(labels: { foo: "zzz" }, val: 1)

expect(metric_store1.all_values).to eq(
{ foo: "bar", pid: "12345" } => 1.0,
{ foo: "bar", pid: "23456" } => 3.0,
{ foo: "baz", pid: "12345" } => 7.0,
{ foo: "baz", pid: "23456" } => 2.0,
{ foo: "yyy", pid: "12345" } => 3.0,
{ foo: "zzz", pid: "23456" } => 1.0,
{ foo: "bar", pid: "host1_12345" } => 1.0,
{ foo: "bar", pid: "host1_23456" } => 3.0,
{ foo: "baz", pid: "host1_12345" } => 7.0,
{ foo: "baz", pid: "host1_23456" } => 2.0,
{ foo: "yyy", pid: "host1_12345" } => 3.0,
{ foo: "zzz", pid: "host1_23456" } => 1.0,
)

# Both processes should return the same value
expect(metric_store1.all_values).to eq(metric_store2.all_values)
end

it "coalesces values irrespective of the order of labels" do
allow(Socket).to receive(:gethostname).and_return("host1")
allow(Process).to receive(:pid).and_return(12345)
metric_store1 = subject.for_metric(:metric_name, metric_type: :gauge)
metric_store1.set(labels: { foo: "1", bar: "1" }, val: 1)
Expand All @@ -184,16 +206,17 @@
metric_store1.set(labels: { bar: "1", foo: "1" }, val: 10)

expect(metric_store1.all_values).to eq(
{ foo: "1", bar: "1", pid: "12345" } => 10.0,
{ foo: "1", bar: "2", pid: "12345" } => 7.0,
{ foo: "2", bar: "1", pid: "12345" } => 3.0,
{ foo: "1", bar: "1", pid: "host1_12345" } => 10.0,
{ foo: "1", bar: "2", pid: "host1_12345" } => 7.0,
{ foo: "2", bar: "1", pid: "host1_12345" } => 3.0,
)

end
end

context "with a metric that takes MAX instead of SUM" do
it "reports the maximum values from different processes" do
allow(Socket).to receive(:gethostname).and_return("host1")
allow(Process).to receive(:pid).and_return(12345)
metric_store1 = subject.for_metric(
:metric_name,
Expand All @@ -204,6 +227,7 @@
metric_store1.set(labels: { foo: "baz" }, val: 7)
metric_store1.set(labels: { foo: "yyy" }, val: 3)

allow(Socket).to receive(:gethostname).and_return("host1")
allow(Process).to receive(:pid).and_return(23456)
metric_store2 = subject.for_metric(
:metric_name,
Expand All @@ -228,6 +252,7 @@

context "with a metric that takes MIN instead of SUM" do
it "reports the minimum values from different processes" do
allow(Socket).to receive(:gethostname).and_return("host1")
allow(Process).to receive(:pid).and_return(12345)
metric_store1 = subject.for_metric(
:metric_name,
Expand All @@ -238,6 +263,7 @@
metric_store1.set(labels: { foo: "baz" }, val: 7)
metric_store1.set(labels: { foo: "yyy" }, val: 3)

allow(Socket).to receive(:gethostname).and_return("host1")
allow(Process).to receive(:pid).and_return(23456)
metric_store2 = subject.for_metric(
:metric_name,
Expand All @@ -262,6 +288,7 @@

context "with a metric that takes ALL instead of SUM" do
it "reports all the values from different processes" do
allow(Socket).to receive(:gethostname).and_return("host1")
allow(Process).to receive(:pid).and_return(12345)
metric_store1 = subject.for_metric(
:metric_name,
Expand All @@ -272,6 +299,7 @@
metric_store1.set(labels: { foo: "baz" }, val: 7)
metric_store1.set(labels: { foo: "yyy" }, val: 3)

allow(Socket).to receive(:gethostname).and_return("host1")
allow(Process).to receive(:pid).and_return(23456)
metric_store2 = subject.for_metric(
:metric_name,
Expand All @@ -283,12 +311,12 @@
metric_store2.set(labels: { foo: "zzz" }, val: 1)

expect(metric_store1.all_values).to eq(
{ foo: "bar", pid: "12345" } => 1.0,
{ foo: "bar", pid: "23456" } => 3.0,
{ foo: "baz", pid: "12345" } => 7.0,
{ foo: "baz", pid: "23456" } => 2.0,
{ foo: "yyy", pid: "12345" } => 3.0,
{ foo: "zzz", pid: "23456" } => 1.0,
{ foo: "bar", pid: "host1_12345" } => 1.0,
{ foo: "bar", pid: "host1_23456" } => 3.0,
{ foo: "baz", pid: "host1_12345" } => 7.0,
{ foo: "baz", pid: "host1_23456" } => 2.0,
{ foo: "yyy", pid: "host1_12345" } => 3.0,
{ foo: "zzz", pid: "host1_23456" } => 1.0,
)

# Both processes should return the same value
Expand All @@ -309,14 +337,17 @@
metric_settings: { aggregation: :most_recent }
)

allow(Socket).to receive(:gethostname).and_return("host1")
allow(Process).to receive(:pid).and_return(12345)
metric_store1.set(labels: { foo: "bar" }, val: 1)

allow(Socket).to receive(:gethostname).and_return("host1")
allow(Process).to receive(:pid).and_return(23456)
metric_store2.set(labels: { foo: "bar" }, val: 3) # Supercedes 'bar' in PID 12345
metric_store2.set(labels: { foo: "baz" }, val: 2)
metric_store2.set(labels: { foo: "zzz" }, val: 1)

allow(Socket).to receive(:gethostname).and_return("host1")
allow(Process).to receive(:pid).and_return(12345)
metric_store1.set(labels: { foo: "baz" }, val: 4) # Supercedes 'baz' in PID 23456

Expand Down Expand Up @@ -347,7 +378,7 @@
truncate_calls_count = 0
allow_any_instance_of(Prometheus::Client::DataStores::DirectFileStore::FileMappedDict).
to receive(:resize_file).and_wrap_original do |original_method, *args, &block|

truncate_calls_count += 1
original_method.call(*args, &block)
end
Expand Down