diff --git a/pom.xml b/pom.xml index c4bc871..2b64538 100644 --- a/pom.xml +++ b/pom.xml @@ -318,7 +318,7 @@ org.apache.pdfbox pdfbox - 3.0.2 + 3.0.3 com.github.jai-imageio diff --git a/project.clj b/project.clj index 85ed2d5..75d41be 100644 --- a/project.clj +++ b/project.clj @@ -17,7 +17,7 @@ [io.netty/netty-codec-http "4.1.60.Final"] ; fixes CVE-2021-21295 [com.typesafe.akka/akka-actor_2.12 "2.5.16"] [io.undertow/undertow-core "2.3.5.Final"] - [org.apache.pdfbox/pdfbox "3.0.2"]] + [org.apache.pdfbox/pdfbox "3.0.3"]] :dependencies [[org.clojure/clojure "1.11.2"] [com.amazonaws/aws-java-sdk-s3 "1.12.740"] diff --git a/src/liiteri/preview/preview_generator.clj b/src/liiteri/preview/preview_generator.clj index 376fdba..08ae1f3 100644 --- a/src/liiteri/preview/preview_generator.clj +++ b/src/liiteri/preview/preview_generator.clj @@ -6,7 +6,7 @@ [liiteri.files.file-store :as file-store] [liiteri.preview.interface :as interface] [liiteri.preview.pdf :as pdf]) - (:import (java.util.concurrent Executors TimeUnit ScheduledFuture FutureTask))) + (:import (java.util.concurrent Executors TimeUnit ScheduledFuture))) (def content-types-to-process (concat pdf/content-types)) @@ -24,19 +24,7 @@ (count data-as-byte-array) conn)) -(defn with-timeout - ([f ms] - (let [task (FutureTask. f) - thread (Thread. task)] - (try - (.start thread) - (.get task ms TimeUnit/MILLISECONDS) - (catch Exception e - (.cancel task true) - (.stop thread) - (throw e)))))) - -(defn generate-file-previews [config conn storage-engine file] +(defn generate-file-previews [config conn storage-engine file timeout-scheduler] (let [start-time (System/currentTimeMillis) {file-key :key filename :filename @@ -45,10 +33,14 @@ (log/info (format "Generating previews for '%s' with key '%s', uploaded on %s ..." filename file-key uploaded)) (with-open [input-stream (file-store/get-file storage-engine file-key)] (let [preview-timeout-ms (get-in config [:preview-generator :preview-timeout-ms] 45000) - [page-count previews] (with-timeout #(interface/generate-previews-for-file storage-engine + [page-count previews] (.invokeAny timeout-scheduler [#(try + (interface/generate-previews-for-file storage-engine file input-stream - config) preview-timeout-ms)] + config) + (catch Throwable t + (log/error t "Error in generating previews task") + (throw t)))] preview-timeout-ms TimeUnit/MILLISECONDS)] (doseq [[page-index preview-as-byte-array] (map-indexed vector previews)] (let [preview-key (str file-key "." page-index) preview-filename preview-key] @@ -80,14 +72,14 @@ (metadata-store/set-file-page-count-and-preview-status! file-key nil "error" conn) false)))) -(defn- generate-next-preview [config db storage-engine] +(defn- generate-next-preview [config db storage-engine timeout-scheduler] (try (jdbc/with-db-transaction [tx db] (let [conn {:connection tx}] (if-let [file (metadata-store/get-file-without-preview conn content-types-to-process)] (do (reset! were-unprocessed-files-found-on-last-run true) - (generate-file-previews config conn storage-engine file)) + (generate-file-previews config conn storage-engine file timeout-scheduler)) (do (when @were-unprocessed-files-found-on-last-run (log/info "Preview generation seems to be finished (or errored).")) @@ -97,12 +89,12 @@ (log/error e "Failed to generate preview for the next file") false))) -(defn- generate-previews [config db storage-engine] +(defn- generate-previews [config db storage-engine timeout-scheduler] (try (loop [] - (when (generate-next-preview config db storage-engine) + (when (generate-next-preview config db storage-engine timeout-scheduler) (recur))) (catch Throwable t - (println "Unexpected throwable!") + (log/error t "Unexpected throwable!") (.printStackTrace t)))) (defprotocol Generator @@ -115,7 +107,8 @@ (log/info "Starting document preview generation process...") (let [poll-interval (get-in config [:preview-generator :poll-interval-seconds]) scheduler (Executors/newScheduledThreadPool 1) - preview-generator #(generate-previews config db storage-engine) + timeout-scheduler (Executors/newCachedThreadPool) + preview-generator #(generate-previews config db storage-engine timeout-scheduler) time-unit TimeUnit/SECONDS preview-generator-future (.scheduleAtFixedRate scheduler preview-generator 0 poll-interval time-unit)] (log/info (str "Started document preview generation process, restarting at " poll-interval " " time-unit " intervals.")) diff --git a/src/liiteri/virus_scan.clj b/src/liiteri/virus_scan.clj index 17832ba..22349b3 100644 --- a/src/liiteri/virus_scan.clj +++ b/src/liiteri/virus_scan.clj @@ -1,17 +1,14 @@ (ns liiteri.virus-scan - (:require [chime :as c] - [clojure.core.async :as a] - [clojure.java.jdbc :as jdbc] + (:require [clojure.java.jdbc :as jdbc] [clojure.string :as string] - [clj-time.core :as t] - [clj-time.periodic :as p] [com.stuartsierra.component :as component] [liiteri.db.file-metadata-store :as metadata-store] [liiteri.files.file-store :as file-store] [liiteri.sqs-client :refer [get-sqs-client]] [taoensso.timbre :as log] [cheshire.core :as json]) - (:import [com.amazonaws.services.sqs.model ReceiveMessageRequest])) + (:import [com.amazonaws.services.sqs.model ReceiveMessageRequest] + [java.util.concurrent Executors TimeUnit ScheduledFuture])) (defn- log-virus-scan-result [file-key filename content-type status elapsed-time] (let [status-str (string/upper-case (name status))] @@ -28,6 +25,7 @@ (let [messages (-> (.receiveMessage sqs-client (-> (ReceiveMessageRequest. result-queue-url) (.withWaitTimeSeconds (int 1)))) ; wait time of 1 second is to enable long polling which means we get answers from all sqs servers (.getMessages))] + (log/info (str "Received " (.size messages) " virus scan results")) (doseq [message messages] (try (let [message (json/parse-string (.getBody message) true) @@ -77,24 +75,23 @@ result-queue-url (-> (.getQueueUrl sqs-poll-results-client result-queue-name) (.getQueueUrl)) poll-interval (get-in config [:bucketav :poll-interval-seconds]) - times (c/chime-ch (p/periodic-seq (t/now) (t/seconds poll-interval)) - {:ch (a/chan (a/sliding-buffer 1))}) - s3-bucket (get-in config [:file-store :s3 :bucket])] - (log/info "Starting virus scan results polling") - (a/go-loop [] - (when-let [_ (a/