Skip to content

Commit

Permalink
Message priority impr (#16)
Browse files Browse the repository at this point in the history
* [BUGFIX] fixing logging in http_handler

* [CODING] no more racing. Still it would be better with API an custom type

* [CODING] simplified logging code

* [FEATURE] available doc_type's can be set via command line

* [CGL] Just CGL
  • Loading branch information
xf0e committed Jun 8, 2019
1 parent 0a1e442 commit 548383d
Show file tree
Hide file tree
Showing 5 changed files with 78 additions and 63 deletions.
7 changes: 4 additions & 3 deletions cli-httpd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import (
func init() {
zerolog.TimeFieldFormat = time.StampMilli
// Default level is info, unless debug flag is present
zerolog.SetGlobalLevel(zerolog.DebugLevel)
zerolog.SetGlobalLevel(zerolog.InfoLevel)
}

func main() {
Expand Down Expand Up @@ -67,10 +67,11 @@ func main() {
)

}

rabbitConfig := ocrworker.DefaultConfigFlagsOverride(flagFunc)
if debug == true {
zerolog.SetGlobalLevel(zerolog.InfoLevel)
zerolog.SetGlobalLevel(zerolog.DebugLevel)
}
rabbitConfig := ocrworker.DefaultConfigFlagsOverride(flagFunc)

// any requests to root, just redirect to main page
http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
Expand Down
10 changes: 5 additions & 5 deletions docs/idea_post_ocr_base64.http
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ POST http://localhost:8080/ocr-status
Content-Type: application/json

{
"img_url": "d5e09ea-8f66-41ab-536d-ee9a623b8766"
"img_url": "6fbef430-6b3b-4b62-7e5e-c964dcd59cb0"
}


Expand All @@ -161,7 +161,7 @@ POST http://localhost:8080/ocr
Content-Type: application/json

{
"img_url":"http://localhost:9898/big.pdf",
"img_url":"http://localhost:7777/big.pdf",
"engine":"sandwich",
"engine_args":{"lang":"eng", "ocr_type":"combinedpdf","result_optimize":true},
"deferred":true,
Expand All @@ -175,9 +175,9 @@ POST http://localhost:8080/ocr
Content-Type: application/json

{
"img_url":"http://localhost:9898/small.pdf",
"img_url":"http://localhost:7777/Antrag.pdf",
"engine":"sandwich",
"engine_args":{"lang":"eng", "ocr_type":"combinedpdf","result_optimize":true},
"engine_args":{"lang":"deu", "ocr_type":"combinedpdf","result_optimize":true},
"deferred":true,
"reply_to":"http://localhost:9999",
"doc_type": "egvp"
Expand Down Expand Up @@ -209,7 +209,7 @@ Content-Type: application/json
"engine_args":{"lang":"eng", "ocr_type":"txt","result_optimize":true},
"deferred":true,
"reply_to":"http://localhost:9999",
"doc_type": "eg",
"doc_type": "egvp",
"page_number": 2,
"time_out": 24,
"user_agent": "0xdeadbeef"
Expand Down
9 changes: 6 additions & 3 deletions ocr_http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,16 @@ func (s *OcrHTTPStatusHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques
ServiceCanAcceptMu.Unlock()
if !serviceCanAcceptLocal && !appStopLocal {
err := "no resources available to process the request"
log.Error().Err(fmt.Errorf(err)).Str("component", "OCR_HTTP")
log.Warn().Str("component", "OCR_HTTP").Err(fmt.Errorf(err)).
Msg("conditions for accepting new requests are not met")
http.Error(w, err, 503)
return
}

if !serviceCanAcceptLocal && appStopLocal {
err := "service is going down"
log.Error().Err(fmt.Errorf(err)).Str("component", "OCR_HTTP")
log.Warn().Str("component", "OCR_HTTP").Err(fmt.Errorf(err)).
Msg("conditions for accepting new requests are not met")
http.Error(w, err, 503)
return
}
Expand All @@ -57,7 +59,8 @@ func (s *OcrHTTPStatusHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques
decoder := json.NewDecoder(req.Body)
err := decoder.Decode(&ocrRequest)
if err != nil {
log.Error().Err(err).Str("component", "OCR_HTTP")
log.Warn().Str("component", "OCR_HTTP").Err(err).
Msg("did the client send a valid json?")
http.Error(w, "Unable to unmarshal json", 400)
return
}
Expand Down
96 changes: 45 additions & 51 deletions ocr_rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func newOcrResult(id string) OcrResult {
}

var (
requestsAndTimersMu sync.Mutex
requestsAndTimersMu sync.RWMutex
// Requests is for holding and monitoring queued requests
Requests = make(map[string]chan OcrResult)
timers = make(map[string]*time.Timer)
Expand All @@ -64,22 +64,22 @@ func (c *OcrRpcClient) DecodeImage(ocrRequest OcrRequest, requestID string) (Ocr
var err error

logger := zerolog.New(os.Stdout).With().
Str("component", "OCR_CLIENT").
Uint("Timeout", ocrRequest.TimeOut).
Str("RequestID", requestID).Timestamp().Logger()

logger.Info().Str("component", "OCR_CLIENT").
Bool("Deferred", ocrRequest.Deferred).
logger.Info().Bool("Deferred", ocrRequest.Deferred).
Str("DocType", ocrRequest.DocType).
Interface("EngineArgs", ocrRequest.EngineArgs).
Bool("InplaceDecode", ocrRequest.InplaceDecode).
Uint16("PageNumber", ocrRequest.PageNumber).
Str("ReplyTo", ocrRequest.ReplyTo).
Str("UserAgent", ocrRequest.UserAgent).
Str("EngineType", string(ocrRequest.EngineType)).
Uint("TimeOut", ocrRequest.TimeOut).
Msg("incoming request")

if ocrRequest.ReplyTo != "" {
logger.Info().Str("component", "OCR_CLIENT").Msg("Automated response requested")
logger.Info().Msg("Automated response requested")
validURL, err := checkURLForReplyTo(ocrRequest.ReplyTo)
if err != nil {
return OcrResult{}, err
Expand All @@ -92,14 +92,15 @@ func (c *OcrRpcClient) DecodeImage(ocrRequest OcrRequest, requestID string) (Ocr

var messagePriority uint8 = 1
if ocrRequest.DocType != "" {
logger.Info().Str("component", "OCR_CLIENT").Str("DocType", ocrRequest.DocType).
logger.Info().Str("DocType", ocrRequest.DocType).
Msg("message type is specified, check for higher prio request")
// set highest priority for defined message id
// TODO do not hard code DocType priority
if ocrRequest.DocType == "egvp" {
messagePriority = 9
logger.Debug().Interface("doc_types_available", c.rabbitConfig.QueuePrio)
if val, ok := c.rabbitConfig.QueuePrio[ocrRequest.DocType]; ok {
messagePriority = val
} else {
messagePriority = c.rabbitConfig.QueuePrio["standard"]
}

}
// setting the timeout for worker if not set or to high
if ocrRequest.TimeOut >= uint(3600) || ocrRequest.TimeOut == 0 {
Expand All @@ -108,7 +109,7 @@ func (c *OcrRpcClient) DecodeImage(ocrRequest OcrRequest, requestID string) (Ocr

// setting rabbitMQ correlation ID. There is no reason to be different from requestID
correlationUUID := requestID
logger.Info().Str("component", "OCR_CLIENT").Str("DocType", ocrRequest.DocType).
logger.Info().Str("DocType", ocrRequest.DocType).
Str("AmqpURI", c.rabbitConfig.AmqpURI).
Msg("dialing RabbitMQ")

Expand Down Expand Up @@ -164,30 +165,25 @@ func (c *OcrRpcClient) DecodeImage(ocrRequest OcrRequest, requestID string) (Ocr

// if we do not have bytes use base 64 file by converting it to bytes
if ocrRequest.hasBase64() {
logger.Info().Str("component", "OCR_CLIENT").Msg("OCR request has base 64 convert it to bytes")
logger.Info().Msg("OCR request has base 64 convert it to bytes")

err = ocrRequest.decodeBase64()
if err != nil {
logger.Warn().Str("component", "OCR_CLIENT").
Err(err).
Msg("Error decoding base64")
logger.Warn().Err(err).Msg("Error decoding base64")
return OcrResult{}, err
}
} else {
// if we do not have base 64 or bytes download the file
err = ocrRequest.downloadImgUrl()
if err != nil {
logger.Warn().Str("component", "OCR_CLIENT").
Err(err).
Msg("Error downloading img url")
logger.Warn().Err(err).Msg("Error downloading img url")
return OcrResult{}, err
}
}
}

routingKey := ocrRequest.nextPreprocessor(c.rabbitConfig.RoutingKey)
logger.Info().Str("component", "OCR_CLIENT").Str("routingKey", routingKey).
Msg("publishing with routing key")
logger.Info().Str("routingKey", routingKey).Msg("publishing with routing key")

ocrRequestJson, err := json.Marshal(ocrRequest)
if err != nil {
Expand Down Expand Up @@ -217,14 +213,14 @@ func (c *OcrRpcClient) DecodeImage(ocrRequest OcrRequest, requestID string) (Ocr
// TODO on deffered request if you get request by polling before it was
// TODO automaticaly delivered then atomatic deliver will POST empty request back after timeout
if ocrRequest.Deferred {
logger.Info().Str("component", "OCR_CLIENT").Msg("Asynchronous request accepted")
logger.Info().Msg("Asynchronous request accepted")
timer := time.NewTimer(time.Duration(ResponseCacheTimeout) * time.Second)
logger.Debug().Str("component", "OCR_CLIENT").Msg("locking vrequestsAndTimersMu")
requestsAndTimersMu.Lock()
logger.Debug().Msg("locking vrequestsAndTimersMu")
requestsAndTimersMu.RLock()
Requests[requestID] = rpcResponseChan
timers[requestID] = timer
logger.Debug().Str("component", "OCR_CLIENT").Msg("unlocking vrequestsAndTimersMu")
requestsAndTimersMu.Unlock()
logger.Debug().Msg("unlocking vrequestsAndTimersMu")
requestsAndTimersMu.RUnlock()
// deferred == true but no automatic reply to the requester
// client should poll to get the ocr
if ocrRequest.ReplyTo == "" {
Expand All @@ -247,17 +243,14 @@ func (c *OcrRpcClient) DecodeImage(ocrRequest OcrRequest, requestID string) (Ocr
for {
select {
case t := <-tickerWithPostAction.C:
logger.Info().Str("component", "OCR_CLIENT").
Str("time", t.String()).
Msg("checking for request to be done")
logger.Info().Str("time", t.String()).Msg("checking for request to be done")

ocrRes, err := CheckOcrStatusByID(requestID, false)
if err != nil {
logger.Error().Err(err)
} // only if status is done end the goroutine. otherwise continue polling
if ocrRes.Status == "done" || ocrRes.Status == "error" {
logger.Info().Str("component", "OCR_CLIENT").
Msg("request is ready")
logger.Info().Msg("request is ready")

var tryCounter uint8 = 1
ocrPostClient := newOcrPostClient()
Expand Down Expand Up @@ -341,8 +334,8 @@ func (c OcrRpcClient) subscribeCallbackQueue(correlationUUID string, rpcResponse
func (c OcrRpcClient) handleRpcResponse(deliveries <-chan amqp.Delivery, correlationUuid string, rpcResponseChan chan OcrResult) {
// correlationUuid is the same as RequestID
logger := zerolog.New(os.Stdout).With().
Str("RequestID", correlationUuid).Timestamp().Logger()
logger.Info().Str("component", "OCR_CLIENT").Msg("looping over deliveries...:")
Str("component", "OCR_CLIENT").Str("RequestID", correlationUuid).Timestamp().Logger()
logger.Info().Msg("looping over deliveries...:")
// TODO this defer is probably a memory leak
// defer c.connection.Close()
for d := range deliveries {
Expand All @@ -352,9 +345,7 @@ func (c OcrRpcClient) handleRpcResponse(deliveries <-chan amqp.Delivery, correla
if bodyLenToLog > 32 {
bodyLenToLog = 32
}
logger.Info().Str("component", "OCR_CLIENT").
Int("size", len(d.Body)).
Uint64("DeliveryTag", d.DeliveryTag).
logger.Info().Int("size", len(d.Body)).Uint64("DeliveryTag", d.DeliveryTag).
Hex("payload(32 Bytes)", d.Body[0:bodyLenToLog]).
Str("ReplyTo", d.ReplyTo).
Msg("got delivery")
Expand All @@ -368,13 +359,13 @@ func (c OcrRpcClient) handleRpcResponse(deliveries <-chan amqp.Delivery, correla
}
ocrResult.ID = correlationUuid

logger.Info().Str("component", "OCR_CLIENT").Msg("send result to rpcResponseChan")
logger.Info().Msg("send result to rpcResponseChan")
rpcResponseChan <- ocrResult
logger.Info().Str("component", "OCR_CLIENT").Msg("sent result to rpcResponseChan")
logger.Info().Msg("sent result to rpcResponseChan")
return

} else {
logger.Info().Str("component", "OCR_CLIENT").Str("CorrelationId", d.CorrelationId).
logger.Info().Str("CorrelationId", d.CorrelationId).
Msg("ignoring delivery w/ correlation id")
}

Expand All @@ -384,31 +375,34 @@ func (c OcrRpcClient) handleRpcResponse(deliveries <-chan amqp.Delivery, correla
// CheckOcrStatusByID checks status of an ocr request based on origin of request
func CheckOcrStatusByID(requestID string, httpStatusCheck bool) (OcrResult, error) {
log.Info().Str("component", "OCR_CLIENT").Msg("CheckOcrStatusByID called")
//log.Debug().Str("component", "OCR_CLIENT").Msg("locking vrequestsAndTimersMu CheckOcrStatusByID")
//requestsAndTimersMu.Lock()
requestsAndTimersMu.RLock()
if _, ok := Requests[requestID]; !ok {
//requestsAndTimersMu.Unlock()
requestsAndTimersMu.RUnlock()
return OcrResult{}, fmt.Errorf("no such request %s", requestID)
} else if ok && httpStatusCheck {
return OcrResult{Status: "processing", ID: requestID}, nil
}

log.Debug().Str("component", "OCR_CLIENT").Msg("getting ocrResult := <-Requests[requestID]")
ocrResult := <-Requests[requestID]
log.Debug().Str("component", "OCR_CLIENT").Msg("got ocrResult := <-Requests[requestID]")

log.Info().Str("component", "OCR_CLIENT").Msg("Number of messages in the queue:" +
fmt.Sprintf("%v", len(Requests)))
ocrResult := OcrResult{}
requestsAndTimersMu.RLock()
select {
case ocrResult = <-Requests[requestID]:
log.Debug().Str("component", "OCR_CLIENT").Msg("got ocrResult := <-Requests[requestID]")
default:
log.Info().Str("component", "OCR_CLIENT").Msg("Number of messages in the queue:" +
fmt.Sprintf("%v", len(Requests)))
}
requestsAndTimersMu.RUnlock()

if ocrResult.Status != "processing" {
log.Debug().Str("component", "OCR_CLIENT").Msg("deleting Requests and timers")
if ocrResult.Status != "processing" && ocrResult.ID != "" {
log.Debug().Str("component", "OCR_CLIENT").Msg("deleting from Requests and timers")
requestsAndTimersMu.RLock()
delete(Requests, requestID)
timers[requestID].Stop()
delete(timers, requestID)
requestsAndTimersMu.RUnlock()
}
//ocrResult.ID = requestID
//log.Debug().Str("component", "OCR_CLIENT").Msg("unlocking vrequestsAndTimersMu CheckOcrStatusByID")
//requestsAndTimersMu.Unlock()
return ocrResult, nil
}

Expand Down
19 changes: 18 additions & 1 deletion rabbit_config.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package ocrworker

import (
"encoding/json"
"flag"
"github.com/rs/zerolog/log"
)

type RabbitConfig struct {
Expand All @@ -11,10 +13,11 @@ type RabbitConfig struct {
RoutingKey string
Reliable bool
AmqpAPIURI string
APIPort string
APIPathQueue string
APIQueueName string
APIPathStats string
QueuePrio map[string]uint8
QueuePrioArg string
}

func DefaultTestConfig() RabbitConfig {
Expand All @@ -33,6 +36,7 @@ func DefaultTestConfig() RabbitConfig {
APIPathQueue: "/api/queues/%2f/",
APIQueueName: "decode-ocr",
APIPathStats: "/api/nodes",
QueuePrio: map[string]uint8{"standard": 1},
}
return rabbitConfig

Expand All @@ -50,6 +54,7 @@ func DefaultConfigFlagsOverride(flagFunction FlagFunction) RabbitConfig {
flagFunction()
var AmqpAPIURI string
var AmqpURI string
var QueuePrioArg string
flag.StringVar(
&AmqpURI,
"amqp_uri",
Expand All @@ -62,6 +67,12 @@ func DefaultConfigFlagsOverride(flagFunction FlagFunction) RabbitConfig {
"",
"The Amqp API URI, eg: http://guest:guest@localhost:15672/",
)
flag.StringVar(
&QueuePrioArg,
"queue_prio",
"",
"JSON formated list wich doc_type and corresponding prio ",
)

flag.Parse()
if len(AmqpURI) > 0 {
Expand All @@ -70,6 +81,12 @@ func DefaultConfigFlagsOverride(flagFunction FlagFunction) RabbitConfig {
if len(AmqpAPIURI) > 0 {
rabbitConfig.AmqpAPIURI = AmqpAPIURI
}
if len(QueuePrioArg) > 0 {
err := json.Unmarshal([]byte(QueuePrioArg), &rabbitConfig.QueuePrio)
if err != nil {
log.Fatal().Err(err).Msg("Message priority argument list is not in a proper JSON format eg. {\"egvp\":9}")
}
}

return rabbitConfig

Expand Down

0 comments on commit 548383d

Please sign in to comment.