|
1 | 1 | (ns liiteri.virus-scan
|
2 |
| - (:require [chime :as c] |
3 |
| - [clojure.core.async :as a] |
4 |
| - [clojure.java.jdbc :as jdbc] |
| 2 | + (:require [clojure.java.jdbc :as jdbc] |
5 | 3 | [clojure.string :as string]
|
6 |
| - [clj-time.core :as t] |
7 |
| - [clj-time.periodic :as p] |
8 | 4 | [com.stuartsierra.component :as component]
|
9 | 5 | [liiteri.db.file-metadata-store :as metadata-store]
|
10 | 6 | [liiteri.files.file-store :as file-store]
|
11 | 7 | [liiteri.sqs-client :refer [get-sqs-client]]
|
12 | 8 | [taoensso.timbre :as log]
|
13 | 9 | [cheshire.core :as json])
|
14 |
| - (:import [com.amazonaws.services.sqs.model ReceiveMessageRequest])) |
| 10 | + (:import [com.amazonaws.services.sqs.model ReceiveMessageRequest] |
| 11 | + [java.util.concurrent Executors TimeUnit ScheduledFuture])) |
15 | 12 |
|
16 | 13 | (defn- log-virus-scan-result [file-key filename content-type status elapsed-time]
|
17 | 14 | (let [status-str (string/upper-case (name status))]
|
|
28 | 25 | (let [messages (-> (.receiveMessage sqs-client (-> (ReceiveMessageRequest. result-queue-url)
|
29 | 26 | (.withWaitTimeSeconds (int 1)))) ; wait time of 1 second is to enable long polling which means we get answers from all sqs servers
|
30 | 27 | (.getMessages))]
|
| 28 | + (log/info (str "Received " (.size messages) " virus scan results")) |
31 | 29 | (doseq [message messages]
|
32 | 30 | (try
|
33 | 31 | (let [message (json/parse-string (.getBody message) true)
|
|
77 | 75 | result-queue-url (-> (.getQueueUrl sqs-poll-results-client result-queue-name)
|
78 | 76 | (.getQueueUrl))
|
79 | 77 | poll-interval (get-in config [:bucketav :poll-interval-seconds])
|
80 |
| - times (c/chime-ch (p/periodic-seq (t/now) (t/seconds poll-interval)) |
81 |
| - {:ch (a/chan (a/sliding-buffer 1))}) |
82 |
| - s3-bucket (get-in config [:file-store :s3 :bucket])] |
83 |
| - (log/info "Starting virus scan results polling") |
84 |
| - (a/go-loop [] |
85 |
| - (when-let [_ (a/<! times)] |
86 |
| - (log/info "Polling for virus scan results") |
87 |
| - (while (< 0 (poll-scan-results sqs-poll-results-client result-queue-url db storage-engine config))) |
88 |
| - (recur))) |
89 |
| - (assoc this :chan times |
| 78 | + s3-bucket (get-in config [:file-store :s3 :bucket]) |
| 79 | + |
| 80 | + scheduler (Executors/newScheduledThreadPool 1) |
| 81 | + virus-scan #(while (< 0 (poll-scan-results sqs-poll-results-client result-queue-url db storage-engine config))) |
| 82 | + time-unit TimeUnit/SECONDS |
| 83 | + virus-scan-future (.scheduleAtFixedRate scheduler virus-scan 0 poll-interval time-unit)] |
| 84 | + (log/info (str "Started virus scan results polling process, restarting at " poll-interval " " time-unit " intervals.")) |
| 85 | + (assoc this :virus-scan-future virus-scan-future |
90 | 86 | :request-queue-url request-queue-url
|
91 | 87 | :sqs-request-scan-client sqs-request-scan-client
|
92 | 88 | :s3-bucket s3-bucket)))
|
93 | 89 |
|
94 | 90 | (stop [this]
|
95 |
| - (when-let [chan (:chan this)] |
96 |
| - (a/close! chan)) |
| 91 | + (when-let [^ScheduledFuture virus-scan-future (:virus-scan-future this)] |
| 92 | + (.cancel virus-scan-future true)) |
97 | 93 | (log/info "Stopped virus scan results polling")
|
98 |
| - (assoc this :chan nil |
| 94 | + (assoc this :virus-scan-future nil |
99 | 95 | :request-queue-url nil
|
100 | 96 | :sqs-request-scan-client nil
|
101 | 97 | :s3-bucket nil))
|
|
0 commit comments