Skip to content

Commit ee922f8

Browse files
author
Steve Freegard
committed
Initial commit
0 parents  commit ee922f8

14 files changed

+1243
-0
lines changed

ExampleEmails.java

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
import java.io.IOException;
2+
import java.nio.file.Files;
3+
import java.nio.file.Path;
4+
import java.text.SimpleDateFormat;
5+
import java.util.Base64;
6+
import java.util.Date;
7+
import java.util.HashMap;
8+
import java.util.HashSet;
9+
import java.util.List;
10+
import java.util.Map;
11+
import java.util.Set;
12+
13+
import io.vertx.core.Vertx;
14+
import io.vertx.core.json.JsonObject;
15+
import io.vertx.ext.stomp.Frame;
16+
import io.vertx.ext.stomp.StompClient;
17+
import io.vertx.ext.stomp.StompClientConnection;
18+
import io.vertx.ext.stomp.StompClientOptions;
19+
20+
public class ExampleEmails {
21+
22+
public static final String SERVER = "stream.abusix.net:61612";
23+
public static final boolean SSL = true;
24+
public static final String CREDENTIALS = "<user>:<pass>";
25+
public static final String TOPIC = "<topic>";
26+
27+
public static final boolean WRITE_RAW_EMAILS = false;
28+
29+
public static final SimpleDateFormat OUTPUT_DIR_FORMAT = new SimpleDateFormat("'filtered'/yyyy-MM-dd/HH/");
30+
31+
public static final Map<String, List<String>> ALLOWLIST_FILTERS = new HashMap<>();
32+
public static final Map<String, List<String>> BLOCKLIST_FILTERS = new HashMap<>();
33+
34+
public static final Set<String> NON_EMPTY_FILTERS = new HashSet<>();
35+
36+
static {
37+
/*
38+
Possible fields:
39+
- data_colorcode
40+
- data_origin
41+
- detected_text_language
42+
- source_ip
43+
- source_ip_country_iso
44+
- source_ip_rir
45+
- source_port
46+
*/
47+
// ALLOWLIST_FILTERS.put("data_colorcode", Arrays.asList("black"));
48+
// BLOCKLIST_FILTERS.put("detected_text_language", Arrays.asList("jp", "kr"));
49+
50+
/*
51+
Possible fields:
52+
- email_urls
53+
- data_origin
54+
- email_attachment_content_types
55+
- email_attachment_file_names
56+
- email_attachment_tags
57+
*/
58+
// NON_EMPTY_FILTERS.add("email_urls");
59+
}
60+
61+
public static void onMessage(Frame frame) {
62+
JsonObject json = frame.getBody().toJsonObject();
63+
64+
for (Map.Entry<String, List<String>> entry : ALLOWLIST_FILTERS.entrySet()) {
65+
String field = entry.getKey();
66+
List<String> allowed = entry.getValue();
67+
if (!allowed.contains(json.getString(field))) {
68+
return;
69+
}
70+
}
71+
72+
for (Map.Entry<String, List<String>> entry : BLOCKLIST_FILTERS.entrySet()) {
73+
String field = entry.getKey();
74+
List<String> forbidden = entry.getValue();
75+
if (forbidden.contains(json.getString(field))) {
76+
return;
77+
}
78+
}
79+
80+
for (String field : NON_EMPTY_FILTERS) {
81+
if (json.getString(field).isEmpty()) {
82+
return;
83+
}
84+
}
85+
86+
Path target = Path.of(OUTPUT_DIR_FORMAT.format(new Date()));
87+
if (!target.toFile().exists()) {
88+
target.toFile().mkdirs();
89+
}
90+
91+
try {
92+
if (WRITE_RAW_EMAILS) {
93+
Files.write(target.resolve(json.hashCode() + ".eml"),
94+
Base64.getDecoder().decode(json.getString("original_message_base64_encoded")));
95+
} else {
96+
Files.write(target.resolve(json.hashCode() + ".json"),
97+
json.toString().getBytes());
98+
}
99+
} catch (IOException e) {
100+
System.err.println("Could not write file.");
101+
e.printStackTrace();
102+
}
103+
}
104+
105+
public static void listen() {
106+
StompClientOptions options = new StompClientOptions();
107+
options.setHeartbeat(new JsonObject().put("x", 10000).put("y", 10000));
108+
options.setHost(SERVER.split(":")[0]);
109+
options.setPort(Integer.parseInt(SERVER.split(":")[1]));
110+
options.setSsl(SSL);
111+
options.setLogin(CREDENTIALS.split(":")[0]);
112+
options.setPasscode(CREDENTIALS.split(":")[1]);
113+
114+
StompClient client = StompClient.create(Vertx.vertx(), options);
115+
client.connect(ar -> {
116+
if (ar.succeeded()) {
117+
StompClientConnection conn = ar.result();
118+
119+
Map<String, String> headers = new HashMap<>();
120+
headers.put("id", "1234"); // something unique
121+
122+
// To disable load-balancing (shared subscriptions):
123+
//headers.put("channel", CREDENTIALS.split(":")[0] + "something_unique");
124+
125+
// Subscribe to topic
126+
conn.subscribe(TOPIC, headers, ExampleEmails::onMessage);
127+
}
128+
});
129+
client.close();
130+
}
131+
132+
public static void main(String[] args) {
133+
listen();
134+
}
135+
}

ExampleFilesJSON.java

Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
import java.io.IOException;
2+
import java.nio.file.Files;
3+
import java.nio.file.Path;
4+
import java.text.SimpleDateFormat;
5+
import java.util.Base64;
6+
import java.util.Date;
7+
import java.util.HashMap;
8+
import java.util.HashSet;
9+
import java.util.List;
10+
import java.util.Map;
11+
import java.util.Set;
12+
13+
import io.vertx.core.Vertx;
14+
import io.vertx.core.json.JsonObject;
15+
import io.vertx.ext.stomp.Frame;
16+
import io.vertx.ext.stomp.StompClient;
17+
import io.vertx.ext.stomp.StompClientConnection;
18+
import io.vertx.ext.stomp.StompClientOptions;
19+
20+
public class ExampleFilesJSON {
21+
22+
public static final String SERVER = "stream.abusix.net:61612";
23+
public static final boolean SSL = true;
24+
public static final String CREDENTIALS = "<user>:<pass>";
25+
public static final String TOPIC = "<topic>";
26+
27+
public static final boolean WRITE_RAW_FILES = true;
28+
29+
public static final SimpleDateFormat OUTPUT_DIR_FORMAT = new SimpleDateFormat("'filtered'/yyyy-MM-dd/HH/");
30+
31+
public static final Map<String, List<String>> ALLOWLIST_FILTERS = new HashMap<>();
32+
public static final Map<String, List<String>> BLOCKLIST_FILTERS = new HashMap<>();
33+
34+
static {
35+
/*
36+
Possible fields:
37+
- content_type
38+
- data_origin
39+
- detected_extension
40+
- filename
41+
- smtp_mail_from
42+
- source_ip
43+
- source_ip_country_iso
44+
- source_ip_rir
45+
- source_port
46+
*/
47+
// ALLOWLIST_FILTERS.put("filename", Arrays.asList("attachment.txt"));
48+
// BLOCKLIST_FILTERS.put("detected_extension", Arrays.asList("exe"));
49+
}
50+
51+
public static void onMessage(Frame frame) {
52+
JsonObject json = frame.getBody().toJsonObject();
53+
54+
for (Map.Entry<String, List<String>> entry : ALLOWLIST_FILTERS.entrySet()) {
55+
String field = entry.getKey();
56+
List<String> allowed = entry.getValue();
57+
if (!allowed.contains(json.getString(field))) {
58+
return;
59+
}
60+
}
61+
62+
for (Map.Entry<String, List<String>> entry : BLOCKLIST_FILTERS.entrySet()) {
63+
String field = entry.getKey();
64+
List<String> forbidden = entry.getValue();
65+
if (forbidden.contains(json.getString(field))) {
66+
return;
67+
}
68+
}
69+
70+
Path target = Path.of(OUTPUT_DIR_FORMAT.format(new Date()));
71+
if (!target.toFile().exists()) {
72+
target.toFile().mkdirs();
73+
}
74+
75+
try {
76+
if (WRITE_RAW_FILES) {
77+
Files.write(target.resolve(json.hashCode() + ".file"),
78+
Base64.getDecoder().decode(json.getString("attachment_base64_encoded")));
79+
} else {
80+
Files.write(target.resolve(json.hashCode() + ".json"),
81+
json.toString().getBytes());
82+
}
83+
} catch (IOException e) {
84+
System.err.println("Could not write file.");
85+
e.printStackTrace();
86+
}
87+
}
88+
89+
public static void listen() {
90+
StompClientOptions options = new StompClientOptions();
91+
options.setHeartbeat(new JsonObject().put("x", 10000).put("y", 10000));
92+
options.setHost(SERVER.split(":")[0]);
93+
options.setPort(Integer.parseInt(SERVER.split(":")[1]));
94+
options.setSsl(SSL);
95+
options.setLogin(CREDENTIALS.split(":")[0]);
96+
options.setPasscode(CREDENTIALS.split(":")[1]);
97+
98+
StompClient client = StompClient.create(Vertx.vertx(), options);
99+
client.connect(ar -> {
100+
if (ar.succeeded()) {
101+
StompClientConnection conn = ar.result();
102+
103+
Map<String, String> headers = new HashMap<>();
104+
headers.put("id", "1234"); // something unique
105+
106+
// To disable load-balancing (shared subscriptions):
107+
//headers.put("channel", CREDENTIALS.split(":")[0] + "something_unique");
108+
109+
// Subscribe to topic
110+
conn.subscribe(TOPIC, headers, ExampleFilesJSON::onMessage);
111+
}
112+
});
113+
client.close();
114+
}
115+
116+
public static void main(String[] args) {
117+
listen();
118+
}
119+
}

ExampleFilesRaw.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
import java.io.IOException;
2+
import java.nio.file.Files;
3+
import java.nio.file.Path;
4+
import java.text.SimpleDateFormat;
5+
import java.util.Base64;
6+
import java.util.Date;
7+
import java.util.HashMap;
8+
import java.util.Map;
9+
10+
import io.vertx.core.Vertx;
11+
import io.vertx.core.json.JsonObject;
12+
import io.vertx.ext.stomp.Frame;
13+
import io.vertx.ext.stomp.StompClient;
14+
import io.vertx.ext.stomp.StompClientConnection;
15+
import io.vertx.ext.stomp.StompClientOptions;
16+
17+
public class ExampleFilesRaw {
18+
19+
public static final String SERVER = "stream.abusix.net:61612";
20+
public static final boolean SSL = true;
21+
public static final String CREDENTIALS = "<user>:<pass>";
22+
public static final String TOPIC = "<topic>";
23+
24+
public static final SimpleDateFormat OUTPUT_DIR_FORMAT = new SimpleDateFormat("'filtered'/yyyy-MM-dd/HH/");
25+
26+
public static void onMessage(Frame frame) {
27+
Path target = Path.of(OUTPUT_DIR_FORMAT.format(new Date()));
28+
if (!target.toFile().exists()) {
29+
target.toFile().mkdirs();
30+
}
31+
32+
try {
33+
Files.write(target.resolve(frame.getBody().hashCode() + ".file"),
34+
Base64.getDecoder().decode(frame.getBodyAsByteArray()));
35+
} catch (IOException e) {
36+
System.err.println("Could not write file.");
37+
e.printStackTrace();
38+
}
39+
}
40+
41+
public static void listen() {
42+
StompClientOptions options = new StompClientOptions();
43+
options.setHeartbeat(new JsonObject().put("x", 10000).put("y", 10000));
44+
options.setHost(SERVER.split(":")[0]);
45+
options.setPort(Integer.parseInt(SERVER.split(":")[1]));
46+
options.setSsl(SSL);
47+
options.setLogin(CREDENTIALS.split(":")[0]);
48+
options.setPasscode(CREDENTIALS.split(":")[1]);
49+
50+
StompClient client = StompClient.create(Vertx.vertx(), options);
51+
client.connect(ar -> {
52+
if (ar.succeeded()) {
53+
StompClientConnection conn = ar.result();
54+
55+
Map<String, String> headers = new HashMap<>();
56+
headers.put("id", "1234"); // something unique
57+
58+
// To disable load-balancing (shared subscriptions):
59+
//headers.put("channel", CREDENTIALS.split(":")[0] + "something_unique");
60+
61+
// Subscribe to topic
62+
conn.subscribe(TOPIC, headers, ExampleFilesRaw::onMessage);
63+
}
64+
});
65+
client.close();
66+
}
67+
68+
public static void main(String[] args) {
69+
listen();
70+
}
71+
}

0 commit comments

Comments
 (0)