Skip to content

Commit

Permalink
Development (#53)
Browse files Browse the repository at this point in the history
* Format code with gofmt and gofumpt

This commit fixes the style issues introduced in a0ec5c1 according to the output
from gofmt and gofumpt.

Details: https://deepsource.io/gh/xf0e/open-ocr/transform/a4611f60-abb2-4cf6-9fc0-3a6b543b2555/

* [CGL] deprecated io/ioutil replaced by os. CheckOcrStatusByID refactored

* [BUGFIX] refactored CheckOcrStatusByID

* [CODING] corrected help annotation for prometheus in flight request metrics

* [CODING] corrected help annotation for prometheus requests counter

* [CGL] corrected wording in commentary about automatic removing requests from the queue

* [CODING] replaced deprecated c.channel.Public with PublishWithContext

* [CODING] removed unneeded check if map contains the request

* [CODING] reduced complexity of CheckOcrStatusByID

* Format code with gofmt and gofumpt

* [CODING] corrected response message for non existing requests

* [BUGFIX] fixed bug introduced in the prious release there the queue size could be negative rendered application fail to serve

* Format code with gofmt and gofumpt

* [CODING] show version in status page

* [CODING] Control-coupled functions CheckForAcceptRequest resolved

* [CODING] calls to os.Exit only in main() or init() functions

* [CODING] comparing unsigned values against negative values is pointless

* [CODING] func newOcrResult was unused

* [CODING] this value of tempChannel is never used

* Fix unused method receiver (#54)

Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com>

* Unused parameter should be replaced by underscore (#55)

Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com>

Co-authored-by: deepsource-autofix[bot] <62050782+deepsource-autofix[bot]@users.noreply.github.com>
Co-authored-by: Artem Mil <[email protected]>
  • Loading branch information
3 people authored Aug 12, 2022
1 parent d4a4f7d commit 0773485
Show file tree
Hide file tree
Showing 16 changed files with 92 additions and 112 deletions.
9 changes: 4 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,16 @@ GO_FILES := $(shell find . -name 'main.go' | grep -v /vendor/)
all: run

release:
go build -o ${OUT_WORKER} -buildmode=pie -a -tags 'netgo,static_build' -trimpath -ldflags="-linkmode external -s -w -extldflags '--static-pie' -X 'github.com/xf0e/open-ocr.buildTime=${DATE}' \
-X 'github.com/xf0e/open-ocr.sha1ver=${SHA1VER}' -X 'github.com/xf0e/open-ocr.version=${VERSION}'" cli-worker/main.go
go build -o ${OUT_WORKER} -buildmode=pie -a -tags 'netgo,static_build' -trimpath -ldflags="-linkmode external -s -w -extldflags '--static-pie' -X main.buildTime=${DATE} \
-X main.sha1ver=${SHA1VER} -X 'github.com/xf0e/open-ocr.version=${VERSION}' -X main.version=${VERSION}'" cli-worker/main.go
go build -o ${OUT_HTTPD} -buildmode=pie -a -tags 'netgo,static_build' -trimpath -ldflags="-linkmode external -s -w -extldflags '--static-pie' -X main.buildTime=${DATE} \
-X main.sha1ver=${SHA1VER} -X main.version=${VERSION} -X 'github.com/xf0e/open-ocr.version=${VERSION}'" cli-httpd/main.go
go build -o ${OUT_PREPROCESSOR} -buildmode=pie -a -tags 'netgo,static_build' -trimpath -ldflags="-linkmode external -s -w -extldflags '--static-pie' -X main.buildTime=${DATE} \
-X main.sha1ver=${SHA1VER} -X main.version=${VERSION}" cli-preprocessor/main.go

debug:
@go build -o ${OUT_WORKER} -buildmode=pie -a -tags netgo -ldflags="-w -X github.com/xf0e/open-ocr.buildTime=${DATE} \
-X github.com/xf0e/open-ocr.sha1ver=${SHA1VER} -X github.com/xf0e/open-ocr.version=${VERSION}" cli-worker/main.go
@go build -o ${OUT_WORKER} -buildmode=pie -a -tags netgo -ldflags="-w -X main.buildTime=${DATE} \
-X main.sha1ver=${SHA1VER} -X main.version=${VERSION} -X github.com/xf0e/open-ocr.version=${VERSION}" cli-worker/main.go
@go build -o ${OUT_HTTPD} -buildmode=pie -a -tags netgo -ldflags="-w -X main.buildTime=${DATE} \
-X main.sha1ver=${SHA1VER} -X main.version=${VERSION}" cli-httpd/main.go
@go build -o ${OUT_PREPROCESSOR} -buildmode=pie -a -tags netgo -ldflags="-w -X main.buildTime=${DATE} \
Expand Down Expand Up @@ -50,4 +50,3 @@ clean:
-@rm ${OUT_WORKER} ${OUT_HTTPD} ${OUT_PREPROCESSOR}

.PHONY: run release static vet lint

2 changes: 1 addition & 1 deletion cli-httpd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func init() {
func handleIndex(writer http.ResponseWriter, _ *http.Request) {
writer.Header().Add("Strict-Transport-Security", "max-age=63072000; includeSubDomains")
appStopLocal = ocrworker.AppStop
text := ocrworker.GenerateLandingPage(appStopLocal, ocrworker.TechnicalErrorResManager)
text := ocrworker.GenerateLandingPage(appStopLocal, ocrworker.TechnicalErrorResManager, version)
_, _ = fmt.Fprint(writer, text)
}

Expand Down
15 changes: 15 additions & 0 deletions cli-worker/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package main

import (
"fmt"
"net/url"
"os"

// _ "net/http/pprof"
"time"

Expand All @@ -10,6 +13,12 @@ import (
"github.com/xf0e/open-ocr"
)

var (
sha1ver string
buildTime string
version string
)

// This assumes that there is a rabbit mq running
// To test it, fire up a web server and send it a curl request

Expand All @@ -26,6 +35,12 @@ func main() {
log.Panic().Str("component", "OCR_WORKER").
Msgf("error getting arguments: %v ", err)
}

if workerConfig.FlgVersion {
fmt.Printf("version %s. Build on %s from git commit hash %s\n", version, buildTime, sha1ver)
os.Exit(0)
}

if workerConfig.Debug {
zerolog.SetGlobalLevel(zerolog.DebugLevel)
}
Expand Down
4 changes: 2 additions & 2 deletions generate_landing_page.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func randomMOTD() string {
}

// GenerateLandingPage will generate a simple landing page
func GenerateLandingPage(appStop, technicalError bool) string {
func GenerateLandingPage(appStop, technicalError bool, version string) string {
statusArray := [4]string{}

if technicalError {
Expand Down Expand Up @@ -177,7 +177,7 @@ pre {
font-family: "Press Start 2P";
}
</style></head><body><section class="nes-container with-title"> <h2 class="title">Open-ocr ></h2>
</style></head><body><section class="nes-container with-title"> <h2 class="title">Open-ocr ` + version + ` ></h2>
<div class="nes-balloon from-left">
<p>` + statusArray[3] + `</p>
</div>
Expand Down
2 changes: 1 addition & 1 deletion mock_engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ const MockEngineResponse = "mock engine decoder response"
type MockEngine struct{}

// ProcessRequest will process incoming OCR request by routing it through the whole process chain
func (m MockEngine) ProcessRequest(ocrRequest *OcrRequest, workerConfig *WorkerConfig) (OcrResult, error) {
func (MockEngine) ProcessRequest(_ *OcrRequest, _ *WorkerConfig) (OcrResult, error) {
return OcrResult{Text: MockEngineResponse}, nil
}
4 changes: 2 additions & 2 deletions ocr_http_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (s *OcrHTTPStatusHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques
// HandleOcrRequest will process incoming OCR request by routing it through the whole process chain
func HandleOcrRequest(ocrRequest *OcrRequest, workerConfig *RabbitConfig) (OcrResult, int, error) {
httpStatus := 200
ocrResult := newOcrResult(ocrRequest.RequestID)
// ocrResult := newOcrResult(ocrRequest.RequestID)
// set the context for zerolog, RequestID will be printed on each logging event
logger := zerolog.New(os.Stdout).With().
Str("RequestID", ocrRequest.RequestID).Timestamp().Logger()
Expand All @@ -135,7 +135,7 @@ func HandleOcrRequest(ocrRequest *OcrRequest, workerConfig *RabbitConfig) (OcrRe
return OcrResult{}, httpStatus, err
}

ocrResult, httpStatus, err = ocrClient.DecodeImage(ocrRequest)
ocrResult, httpStatus, err := ocrClient.DecodeImage(ocrRequest)
if err != nil {
logger.Error().Err(err).Str("component", "OCR_HTTP")
return OcrResult{}, httpStatus, err
Expand Down
4 changes: 2 additions & 2 deletions ocr_http_status_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func NewOcrHttpStatusHandler() *OcrHttpStatusHandler {
return &OcrHttpStatusHandler{}
}

func (s *OcrHttpStatusHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
func (*OcrHttpStatusHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
log.Debug().Str("component", "OCR_STATUS").Msg("OcrHttpStatusHandler called")

ocrRequest := OcrRequest{}
Expand All @@ -32,7 +32,7 @@ func (s *OcrHttpStatusHandler) ServeHTTP(w http.ResponseWriter, req *http.Reques
ocrResult.Status = "not found"
log.Info().Str("component", "OCR_STATUS").Str("RequestID", ocrRequest.ImgUrl).
Str("RemoteAddr", req.RemoteAddr).
Msg("no such ocr request. timeout was probably reached for this request ID")
Msg("no such ocr request, processing time limit was probably reached for this request")
}

w.Header().Set("Content-Type", "application/json")
Expand Down
5 changes: 4 additions & 1 deletion ocr_postback_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,10 @@ import (
"github.com/rs/zerolog"
)

var postTimeout = 50 * time.Second
var (
postTimeout = 50 * time.Second
version string
)

type ocrPostClient struct{}

Expand Down
35 changes: 16 additions & 19 deletions ocr_res_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var (
)

// CheckForAcceptRequest will check by reading the RabbitMQ API if resources for incoming request are available
func CheckForAcceptRequest(urlQueue, urlStat string, statusChanged bool) bool {
func CheckForAcceptRequest(urlQueue, urlStat string) bool {
isAvailable := false
TechnicalErrorResManager = false
jsonQueueStat, err := url2bytes(urlQueue)
Expand Down Expand Up @@ -91,23 +91,6 @@ func CheckForAcceptRequest(urlQueue, urlStat string, statusChanged bool) bool {
TechnicalErrorResManager = false
isAvailable = true
}

if statusChanged {
log.Info().Str("component", "OCR_RESMAN").
Uint("MessageBytes", queueManager.MessageBytes).
Uint("NumConsumers", queueManager.NumConsumers).
Uint("NumMessages", queueManager.NumMessages).
Interface("resManager", resManager).
Msg("OCR_RESMAN stats")

if isAvailable {
log.Info().Str("component", "OCR_RESMAN").Msg("open-ocr is operational with free resources, we are ready to serve")
} else {
log.Info().Str("component", "OCR_RESMAN").Msg("open-ocr is alive but won't serve any requests; workers are busy or not connected")
}

}

return isAvailable
}

Expand Down Expand Up @@ -163,9 +146,23 @@ Loop:
default:
// only print the RESMAN output if the state has changed
ServiceCanAcceptMu.Lock()
boolOldValue, boolCurValue = boolCurValue, CheckForAcceptRequest(urlQueue, urlStat, boolCurValue != boolOldValue)
boolOldValue, boolCurValue = boolCurValue, CheckForAcceptRequest(urlQueue, urlStat)
ServiceCanAccept = boolCurValue
ServiceCanAcceptMu.Unlock()
if boolCurValue != boolOldValue {
log.Info().Str("component", "OCR_RESMAN").
Uint("MessageBytes", queueManager.MessageBytes).
Uint("NumConsumers", queueManager.NumConsumers).
Uint("NumMessages", queueManager.NumMessages).
Interface("resManager", resManager).
Msg("OCR_RESMAN stats")

if boolCurValue {
log.Info().Str("component", "OCR_RESMAN").Msg("open-ocr is operational with free resources, we are ready to serve")
} else {
log.Info().Str("component", "OCR_RESMAN").Msg("open-ocr is alive but won't serve any requests; workers are busy or not connected")
}
}
time.Sleep(sleepFor * time.Second)
}
}
Expand Down
60 changes: 17 additions & 43 deletions ocr_results_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,68 +8,42 @@ import (
var (
RequestsTrack = sync.Map{}
RequestTrackLength = uint32(0)
// ocrWasSentBackChan = make(chan string)
)

// CheckOcrStatusByID checks status of an ocr request based on origin of request
func CheckOcrStatusByID(requestID string) (OcrResult, bool) {
if _, ok := RequestsTrack.Load(requestID); !ok {
// log.Info().Str("component", "OCR_CLIENT").Str("RequestID", requestID).Msg("no such request found in the queue")
return OcrResult{}, false
}

ocrResult := OcrResult{}

tempChannel := make(chan OcrResult)
v, ok := RequestsTrack.Load(requestID)
if ok {
tempChannel = v.(chan OcrResult)
tempChannel := v.(chan OcrResult)
ocrResult := OcrResult{}
select {
case ocrResult = <-tempChannel:
defer deleteRequestFromQueue(requestID)
// log.Debug().Str("component", "OCR_CLIENT").Msg("got ocrResult := <-Requests[requestID]")
return ocrResult, true
default:
return OcrResult{Status: "processing", ID: requestID}, true
}
} else {
// log.Debug().Str("component", "OCR_CLIENT").Str("RequestID", requestID).Msg("no such request found in the queue")
return OcrResult{}, false
}

select {
case ocrResult = <-tempChannel:
// log.Debug().Str("component", "OCR_CLIENT").Msg("got ocrResult := <-Requests[requestID]")
defer deleteRequestFromQueue(requestID)
default:
return OcrResult{Status: "processing", ID: requestID}, true
}

return ocrResult, true
}

func getQueueLen() uint {
return uint(atomic.LoadUint32(&RequestTrackLength))
}

func deleteRequestFromQueue(requestID string) {
inFlightGauge.Dec()
atomic.AddUint32(&RequestTrackLength, ^uint32(0))
RequestsTrack.Delete(requestID)
if _, ok := RequestsTrack.Load(requestID); ok {
atomic.AddUint32(&RequestTrackLength, ^uint32(0))
inFlightGauge.Dec()
RequestsTrack.Delete(requestID)
}
}

func addNewOcrResultToQueue(requestID string, rpcResponseChan chan OcrResult) {
inFlightGauge.Inc()
atomic.AddUint32(&RequestTrackLength, 1)
inFlightGauge.Inc()
RequestsTrack.Store(requestID, rpcResponseChan)

// this go routine will cancel the request after global timeout or if request was sent back
// if the requestID arrives on ocrWasSentBackChan - ocrResult was send back to requester an request deletion is triggered
// go func() {
// select {
// case <-ocrWasSentBackChan:
// if _, ok := RequestsTrack.Load(requestID); ok {
// deleteRequestFromQueue(requestID)
// }
// // TODO: a bug leaking goroutines if the global timeout is set to a low value the routine in ocr_rpc_client:221 will leak
// // TODO since there is no listener in this goroutine since this goroutine is dead
// /* case <-time.After(time.Second * time.Duration(storageTime+10)):
// if _, ok := RequestsTrack.Load(requestID); ok {
// deleteRequestFromQueue(requestID)
// }*/
// //default:
//
// }
// }()
}
30 changes: 15 additions & 15 deletions ocr_rpc_client.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ocrworker

import (
"context"
"encoding/json"
"fmt"
"net/url"
Expand All @@ -16,6 +17,7 @@ import (

// rpcResponseTimeout sets timeout for getting the result from channel
var rpcResponseTimeout = time.Second * 20
var numRetries uint = 3

type OcrRpcClient struct {
rabbitConfig RabbitConfig
Expand All @@ -29,15 +31,6 @@ type OcrResult struct {
ID string `json:"id"`
}

func newOcrResult(id string) OcrResult {
ocrResult := &OcrResult{}
ocrResult.Status = "processing"
ocrResult.ID = id
return *ocrResult
}

var numRetries uint = 3

func NewOcrRpcClient(rc *RabbitConfig) (*OcrRpcClient, error) {
ocrRpcClient := &OcrRpcClient{
rabbitConfig: *rc,
Expand Down Expand Up @@ -177,7 +170,11 @@ func (c *OcrRpcClient) DecodeImage(ocrRequest *OcrRequest) (OcrResult, int, erro
if err != nil {
return OcrResult{}, 500, err
}
if err = c.channel.Publish(
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

if err = c.channel.PublishWithContext(
ctx,
c.rabbitConfig.Exchange, // publish to an exchange
routingKey,
false, // mandatory
Expand All @@ -204,7 +201,7 @@ func (c *OcrRpcClient) DecodeImage(ocrRequest *OcrRequest) (OcrResult, int, erro
// deferred == true but no automatic reply to the requester
// client should poll to get the ocr
if ocrRequest.ReplyTo == "" {
// this go routine will cancel the request after global timeout or if requester doesn't recall the request
// this go routine will cancel the request after global timeout or if requester doesn't retrieve the request
logger.Info().Msg("deferred request without reply-to address set, will decay automatically after " + strconv.FormatUint(uint64(ocrRequest.TimeOut), 10) + " seconds")
go func() {
timeout := time.After(time.Second * time.Duration(ocrRequest.TimeOut+10))
Expand All @@ -214,7 +211,7 @@ func (c *OcrRpcClient) DecodeImage(ocrRequest *OcrRequest) (OcrResult, int, erro
case <-timeout:
if _, ok := RequestsTrack.Load(ocrRequest.RequestID); ok {
deleteRequestFromQueue(ocrRequest.RequestID)
logger.Info().Msg("deferred request without reply-to address has decayed")
logger.Info().Msg("deferred request without reply-to address has decayed, client doesn't claimed request in time")
break Loop
}
default:
Expand All @@ -230,13 +227,16 @@ func (c *OcrRpcClient) DecodeImage(ocrRequest *OcrRequest) (OcrResult, int, erro
ID: ocrRequest.RequestID,
}, 200, nil
}
// automatic delivery oder POST to the requester
// automatic delivery POST to the requester
// check interval for order to be ready to deliver
go func(requestID string) {
// trigger deleting request from internal queue
defer func() {
// ocrWasSentBackChan <- requestID
// if _, ok := RequestsTrack.Load(ocrRequest.RequestID); ok {
// deleteRequestFromQueue(ocrRequest.RequestID)
logger.Info().Msg("request handling finished, deleting it from the queue")
deleteRequestFromQueue(requestID)
// }
}()
ocrRes := OcrResult{ID: ocrRequest.RequestID, Status: "error", Text: ""}
ocrPostClient := newOcrPostClient()
Expand Down Expand Up @@ -280,7 +280,7 @@ func (c *OcrRpcClient) DecodeImage(ocrRequest *OcrRequest) (OcrResult, int, erro
ID: ocrRequest.RequestID,
Status: "processing",
}, 200, nil
} else {
} else { // handle not deferred request
select {
case ocrResult := <-rpcResponseChan:
// logger.Debug().Str("st", ocrResult.Status).Str("text", ocrResult.Text).Str("id", ocrResult.ID)
Expand Down
2 changes: 1 addition & 1 deletion preprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ type Preprocessor interface {

type IdentityPreprocessor struct{}

func (i IdentityPreprocessor) preprocess(ocrRequest *OcrRequest) error {
func (IdentityPreprocessor) preprocess(_ *OcrRequest) error {
return nil
}
Loading

0 comments on commit 0773485

Please sign in to comment.