Skip to content

Commit

Permalink
Merge pull request #123 from Opetushallitus/OY-5025_skannaustulosten_…
Browse files Browse the repository at this point in the history
…prosessointiongelma

OY-5025 skannaustulosten prosessointiongelma
  • Loading branch information
jkorri authored Jan 2, 2025
2 parents d003943 + 709c7fa commit 4ce69ae
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 47 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@
<dependency>
<groupId>org.apache.pdfbox</groupId>
<artifactId>pdfbox</artifactId>
<version>3.0.2</version>
<version>3.0.3</version>
</dependency>
<dependency>
<groupId>com.github.jai-imageio</groupId>
Expand Down
2 changes: 1 addition & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
[io.netty/netty-codec-http "4.1.60.Final"] ; fixes CVE-2021-21295
[com.typesafe.akka/akka-actor_2.12 "2.5.16"]
[io.undertow/undertow-core "2.3.5.Final"]
[org.apache.pdfbox/pdfbox "3.0.2"]]
[org.apache.pdfbox/pdfbox "3.0.3"]]

:dependencies [[org.clojure/clojure "1.11.2"]
[com.amazonaws/aws-java-sdk-s3 "1.12.740"]
Expand Down
37 changes: 15 additions & 22 deletions src/liiteri/preview/preview_generator.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
[liiteri.files.file-store :as file-store]
[liiteri.preview.interface :as interface]
[liiteri.preview.pdf :as pdf])
(:import (java.util.concurrent Executors TimeUnit ScheduledFuture FutureTask)))
(:import (java.util.concurrent Executors TimeUnit ScheduledFuture)))

(def content-types-to-process (concat pdf/content-types))

Expand All @@ -24,19 +24,7 @@
(count data-as-byte-array)
conn))

(defn with-timeout
([f ms]
(let [task (FutureTask. f)
thread (Thread. task)]
(try
(.start thread)
(.get task ms TimeUnit/MILLISECONDS)
(catch Exception e
(.cancel task true)
(.stop thread)
(throw e))))))

(defn generate-file-previews [config conn storage-engine file]
(defn generate-file-previews [config conn storage-engine file timeout-scheduler]
(let [start-time (System/currentTimeMillis)
{file-key :key
filename :filename
Expand All @@ -45,10 +33,14 @@
(log/info (format "Generating previews for '%s' with key '%s', uploaded on %s ..." filename file-key uploaded))
(with-open [input-stream (file-store/get-file storage-engine file-key)]
(let [preview-timeout-ms (get-in config [:preview-generator :preview-timeout-ms] 45000)
[page-count previews] (with-timeout #(interface/generate-previews-for-file storage-engine
[page-count previews] (.invokeAny timeout-scheduler [#(try
(interface/generate-previews-for-file storage-engine
file
input-stream
config) preview-timeout-ms)]
config)
(catch Throwable t
(log/error t "Error in generating previews task")
(throw t)))] preview-timeout-ms TimeUnit/MILLISECONDS)]
(doseq [[page-index preview-as-byte-array] (map-indexed vector previews)]
(let [preview-key (str file-key "." page-index)
preview-filename preview-key]
Expand Down Expand Up @@ -80,14 +72,14 @@
(metadata-store/set-file-page-count-and-preview-status! file-key nil "error" conn)
false))))

(defn- generate-next-preview [config db storage-engine]
(defn- generate-next-preview [config db storage-engine timeout-scheduler]
(try
(jdbc/with-db-transaction [tx db]
(let [conn {:connection tx}]
(if-let [file (metadata-store/get-file-without-preview conn content-types-to-process)]
(do
(reset! were-unprocessed-files-found-on-last-run true)
(generate-file-previews config conn storage-engine file))
(generate-file-previews config conn storage-engine file timeout-scheduler))
(do
(when @were-unprocessed-files-found-on-last-run
(log/info "Preview generation seems to be finished (or errored)."))
Expand All @@ -97,12 +89,12 @@
(log/error e "Failed to generate preview for the next file")
false)))

(defn- generate-previews [config db storage-engine]
(defn- generate-previews [config db storage-engine timeout-scheduler]
(try (loop []
(when (generate-next-preview config db storage-engine)
(when (generate-next-preview config db storage-engine timeout-scheduler)
(recur)))
(catch Throwable t
(println "Unexpected throwable!")
(log/error t "Unexpected throwable!")
(.printStackTrace t))))

(defprotocol Generator
Expand All @@ -115,7 +107,8 @@
(log/info "Starting document preview generation process...")
(let [poll-interval (get-in config [:preview-generator :poll-interval-seconds])
scheduler (Executors/newScheduledThreadPool 1)
preview-generator #(generate-previews config db storage-engine)
timeout-scheduler (Executors/newCachedThreadPool)
preview-generator #(generate-previews config db storage-engine timeout-scheduler)
time-unit TimeUnit/SECONDS
preview-generator-future (.scheduleAtFixedRate scheduler preview-generator 0 poll-interval time-unit)]
(log/info (str "Started document preview generation process, restarting at " poll-interval " " time-unit " intervals."))
Expand Down
33 changes: 15 additions & 18 deletions src/liiteri/virus_scan.clj
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
(ns liiteri.virus-scan
(:require [chime :as c]
[clojure.core.async :as a]
[clojure.java.jdbc :as jdbc]
(:require [clojure.java.jdbc :as jdbc]
[clojure.string :as string]
[clj-time.core :as t]
[clj-time.periodic :as p]
[com.stuartsierra.component :as component]
[liiteri.db.file-metadata-store :as metadata-store]
[liiteri.files.file-store :as file-store]
[liiteri.sqs-client :refer [get-sqs-client]]
[taoensso.timbre :as log]
[cheshire.core :as json])
(:import [com.amazonaws.services.sqs.model ReceiveMessageRequest]))
(:import [com.amazonaws.services.sqs.model ReceiveMessageRequest]
[java.util.concurrent Executors TimeUnit ScheduledFuture]))

(defn- log-virus-scan-result [file-key filename content-type status elapsed-time]
(let [status-str (string/upper-case (name status))]
Expand All @@ -28,6 +25,7 @@
(let [messages (-> (.receiveMessage sqs-client (-> (ReceiveMessageRequest. result-queue-url)
(.withWaitTimeSeconds (int 1)))) ; wait time of 1 second is to enable long polling which means we get answers from all sqs servers
(.getMessages))]
(log/info (str "Received " (.size messages) " virus scan results"))
(doseq [message messages]
(try
(let [message (json/parse-string (.getBody message) true)
Expand Down Expand Up @@ -77,24 +75,23 @@
result-queue-url (-> (.getQueueUrl sqs-poll-results-client result-queue-name)
(.getQueueUrl))
poll-interval (get-in config [:bucketav :poll-interval-seconds])
times (c/chime-ch (p/periodic-seq (t/now) (t/seconds poll-interval))
{:ch (a/chan (a/sliding-buffer 1))})
s3-bucket (get-in config [:file-store :s3 :bucket])]
(log/info "Starting virus scan results polling")
(a/go-loop []
(when-let [_ (a/<! times)]
(while (< 0 (poll-scan-results sqs-poll-results-client result-queue-url db storage-engine config)))
(recur)))
(assoc this :chan times
s3-bucket (get-in config [:file-store :s3 :bucket])

scheduler (Executors/newScheduledThreadPool 1)
virus-scan #(while (< 0 (poll-scan-results sqs-poll-results-client result-queue-url db storage-engine config)))
time-unit TimeUnit/SECONDS
virus-scan-future (.scheduleAtFixedRate scheduler virus-scan 0 poll-interval time-unit)]
(log/info (str "Started virus scan results polling process, restarting at " poll-interval " " time-unit " intervals."))
(assoc this :virus-scan-future virus-scan-future
:request-queue-url request-queue-url
:sqs-request-scan-client sqs-request-scan-client
:s3-bucket s3-bucket)))

(stop [this]
(when-let [chan (:chan this)]
(a/close! chan))
(when-let [^ScheduledFuture virus-scan-future (:virus-scan-future this)]
(.cancel virus-scan-future true))
(log/info "Stopped virus scan results polling")
(assoc this :chan nil
(assoc this :virus-scan-future nil
:request-queue-url nil
:sqs-request-scan-client nil
:s3-bucket nil))
Expand Down
12 changes: 7 additions & 5 deletions test/liiteri/preview/preview_generator_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@
[liiteri.test-utils :as u]
[liiteri.preview.preview-generator :as preview-generator]
[com.stuartsierra.component :as component]
[clj-time.core :as t])
(:import [java.sql Timestamp]))
[clj-time.core :as t]
[taoensso.timbre :as log])
(:import [java.sql Timestamp]
(java.util.concurrent Executors)))

(def system (atom (system/new-system {})))

Expand Down Expand Up @@ -65,7 +67,7 @@
(file-store/create-file store file-object filename)
(metadata-store/set-virus-scan-status! filename "done" conn)
(metadata-store/finalize-files [filename] origin-system origin-reference conn)
(preview-generator/generate-file-previews (:config @system) conn store file-spec)
(preview-generator/generate-file-previews (:config @system) conn store file-spec (Executors/newCachedThreadPool))

(let [file-metadata-after-preview (first (metadata-store/get-normalized-metadata! [filename] conn))
generated-previews (vec (metadata-store/get-previews filename conn))]
Expand Down Expand Up @@ -93,7 +95,7 @@
(file-store/create-file store file-object filename)
(metadata-store/set-virus-scan-status! filename "done" conn)
(metadata-store/finalize-files [filename] origin-system origin-reference conn)
(preview-generator/generate-file-previews (:config @system) conn store file-spec)
(preview-generator/generate-file-previews (:config @system) conn store file-spec (Executors/newCachedThreadPool))

(let [file-metadata-after-preview (first (metadata-store/get-normalized-metadata! [filename] conn))
generated-previews (vec (metadata-store/get-previews filename conn))]
Expand All @@ -119,7 +121,7 @@
(file-store/create-file store file-object filename)
(metadata-store/set-virus-scan-status! filename "done" conn)
(metadata-store/finalize-files [filename] origin-system origin-reference conn)
(preview-generator/generate-file-previews (:config @system) conn store file-spec)
(preview-generator/generate-file-previews (:config @system) conn store file-spec (Executors/newCachedThreadPool))

(file-store/delete-file-and-metadata (:key file-spec) "preview-generator-test" store conn false)

Expand Down

0 comments on commit 4ce69ae

Please sign in to comment.