|
1 | 1 | package org.folio.orders.events.handlers;
|
2 | 2 |
|
3 |
| -import static org.folio.helper.CheckinReceivePiecesHelper.EXPECTED_STATUSES; |
4 |
| -import static org.folio.helper.CheckinReceivePiecesHelper.RECEIVED_STATUSES; |
5 |
| -import static org.folio.orders.utils.ResourcePathResolver.PIECES_STORAGE; |
6 |
| -import static org.folio.orders.utils.ResourcePathResolver.resourcesPath; |
7 |
| -import static org.folio.rest.jaxrs.model.PoLine.ReceiptStatus.AWAITING_RECEIPT; |
8 |
| -import static org.folio.rest.jaxrs.model.PoLine.ReceiptStatus.FULLY_RECEIVED; |
9 |
| -import static org.folio.rest.jaxrs.model.PoLine.ReceiptStatus.PARTIALLY_RECEIVED; |
| 3 | +import static org.folio.orders.events.utils.EventUtils.getPoLineId; |
| 4 | +import static org.folio.orders.utils.HelperUtils.getOkapiHeaders; |
| 5 | +import static org.folio.service.orders.utils.StatusUtils.calculatePoLineReceiptStatus; |
10 | 6 |
|
11 |
| -import java.util.ArrayList; |
12 | 7 | import java.util.List;
|
13 | 8 | import java.util.Map;
|
14 | 9 |
|
15 |
| -import org.apache.commons.collections4.CollectionUtils; |
16 | 10 | import org.folio.helper.BaseHelper;
|
17 | 11 | import org.folio.orders.utils.HelperUtils;
|
18 | 12 | import org.folio.orders.utils.PoLineCommonUtil;
|
19 |
| -import org.folio.rest.core.RestClient; |
20 | 13 | import org.folio.rest.core.models.RequestContext;
|
21 | 14 | import org.folio.rest.jaxrs.model.Piece;
|
22 |
| -import org.folio.rest.jaxrs.model.Piece.ReceivingStatus; |
23 |
| -import org.folio.rest.jaxrs.model.PieceCollection; |
24 | 15 | import org.folio.rest.jaxrs.model.PoLine;
|
25 |
| -import org.folio.rest.jaxrs.model.PoLine.ReceiptStatus; |
26 | 16 | import org.folio.service.orders.PurchaseOrderLineService;
|
| 17 | +import org.folio.service.pieces.PieceStorageService; |
27 | 18 | import org.springframework.beans.factory.annotation.Autowired;
|
28 | 19 | import org.springframework.stereotype.Component;
|
29 | 20 |
|
30 | 21 | import io.vertx.core.Future;
|
31 | 22 | import io.vertx.core.Handler;
|
32 |
| -import io.vertx.core.Promise; |
33 | 23 | import io.vertx.core.Vertx;
|
34 | 24 | import io.vertx.core.eventbus.Message;
|
35 | 25 | import io.vertx.core.json.JsonArray;
|
36 | 26 | import io.vertx.core.json.JsonObject;
|
37 |
| -import one.util.streamex.StreamEx; |
38 | 27 |
|
39 | 28 | @Component("receiptStatusHandler")
|
40 | 29 | public class ReceiptStatusConsistency extends BaseHelper implements Handler<Message<JsonObject>> {
|
41 | 30 |
|
42 |
| - private static final int LIMIT = Integer.MAX_VALUE; |
43 |
| - private static final String PIECES_ENDPOINT = resourcesPath(PIECES_STORAGE) + "?query=poLineId==%s&limit=%s"; |
44 |
| - |
| 31 | + private final PieceStorageService pieceStorageService; |
45 | 32 | private final PurchaseOrderLineService purchaseOrderLineService;
|
46 | 33 |
|
47 | 34 |
|
48 | 35 | @Autowired
|
49 |
| - public ReceiptStatusConsistency(Vertx vertx, PurchaseOrderLineService purchaseOrderLineService) { |
| 36 | + public ReceiptStatusConsistency(Vertx vertx, PieceStorageService pieceStorageService, PurchaseOrderLineService purchaseOrderLineService) { |
50 | 37 | super(vertx.getOrCreateContext());
|
| 38 | + this.pieceStorageService = pieceStorageService; |
51 | 39 | this.purchaseOrderLineService = purchaseOrderLineService;
|
52 | 40 | }
|
53 | 41 |
|
54 | 42 | @Override
|
55 | 43 | public void handle(Message<JsonObject> message) {
|
56 |
| - JsonObject messageFromEventBus = message.body(); |
57 |
| - |
| 44 | + var messageFromEventBus = message.body(); |
58 | 45 | logger.info("Received message body: {}", messageFromEventBus);
|
59 | 46 |
|
60 |
| - Map<String, String> okapiHeaders = org.folio.orders.utils.HelperUtils.getOkapiHeaders(message); |
| 47 | + var okapiHeaders = getOkapiHeaders(message); |
61 | 48 | var requestContext = new RequestContext(ctx, okapiHeaders);
|
62 |
| - List<Future<Void>> futures = new ArrayList<>(); |
63 |
| - Promise<Void> promise = Promise.promise(); |
64 |
| - futures.add(promise.future()); |
65 |
| - |
66 |
| - String poLineIdUpdate = messageFromEventBus.getString("poLineIdUpdate"); |
67 |
| - String query = String.format(PIECES_ENDPOINT, poLineIdUpdate, LIMIT); |
68 |
| - |
69 |
| - // 1. Get all pieces for poLineId |
70 |
| - getPieces(query, requestContext) |
71 |
| - .compose(listOfPieces -> |
72 |
| - // 2. Get PoLine for the poLineId which will be used to calculate PoLineReceiptStatus |
73 |
| - purchaseOrderLineService.getOrderLineById(poLineIdUpdate, requestContext) |
74 |
| - .map(poLine -> { |
75 |
| - if (PoLineCommonUtil.isCancelledOrOngoingStatus(poLine)) { |
76 |
| - promise.complete(); |
77 |
| - return null; |
78 |
| - } |
79 |
| - ReceiptStatus receivingStatus = calculatePoLineReceiptStatus(poLine, listOfPieces); |
80 |
| - boolean statusUpdated = purchaseOrderLineService.updatePoLineReceiptStatusWithoutSave(poLine, receivingStatus); |
81 |
| - if (statusUpdated) { |
82 |
| - purchaseOrderLineService.saveOrderLine(poLine, requestContext) |
83 |
| - .map(aVoid -> { |
84 |
| - // send event to update order status |
85 |
| - updateOrderStatus(poLine, okapiHeaders, requestContext); |
86 |
| - promise.complete(); |
87 |
| - return null; |
88 |
| - }) |
89 |
| - .onFailure(e -> { |
90 |
| - logger.error("The error updating poLine by id {}", poLineIdUpdate, e); |
91 |
| - promise.fail(e); |
92 |
| - }); |
93 |
| - } else { |
94 |
| - promise.complete(); |
95 |
| - } |
96 |
| - return null; |
97 |
| - }) |
98 |
| - .onFailure(e -> { |
99 |
| - logger.error("The error getting poLine by id {}", poLineIdUpdate, e); |
100 |
| - promise.fail(e); |
101 |
| - })) |
102 |
| - .onFailure(e -> { |
103 |
| - logger.error("The error happened getting all pieces by poLine {}", poLineIdUpdate, e); |
104 |
| - promise.fail(e); |
105 |
| - }); |
106 | 49 |
|
107 |
| - // Now wait for all operations to be completed and send reply |
108 |
| - completeAllFutures(futures, message); |
109 |
| - } |
| 50 | + var poLineId = getPoLineId(messageFromEventBus); |
| 51 | + var future = pieceStorageService.getPiecesByLineId(poLineId, requestContext) |
| 52 | + .compose(pieces -> purchaseOrderLineService.getOrderLineById(poLineId, requestContext) |
| 53 | + .compose(poLine -> updatePoLineAndOrderStatuses(pieces, poLine, requestContext)) |
| 54 | + .onFailure(e -> logger.error("Exception occurred while fetching PoLine by id: '{}'", poLineId, e))) |
| 55 | + .onFailure(e -> logger.error("Exception occurred while fetching pieces by PoLine id: '{}'", poLineId, e)); |
110 | 56 |
|
111 |
| - private void updateOrderStatus(PoLine poLine, Map<String, String> okapiHeaders, RequestContext requestContext) { |
112 |
| - List<JsonObject> poIds = StreamEx |
113 |
| - .of(poLine) |
114 |
| - .map(PoLine::getPurchaseOrderId) |
115 |
| - .distinct() |
116 |
| - .map(orderId -> new JsonObject().put(ORDER_ID, orderId)) |
117 |
| - .toList(); |
118 |
| - JsonObject messageContent = new JsonObject(); |
119 |
| - messageContent.put(OKAPI_HEADERS, okapiHeaders); |
120 |
| - // Collect order ids which should be processed |
121 |
| - messageContent.put(EVENT_PAYLOAD, new JsonArray(poIds)); |
122 |
| - HelperUtils.sendEvent(MessageAddress.RECEIVE_ORDER_STATUS_UPDATE, messageContent, requestContext); |
| 57 | + completeAllFutures(List.of(future), message); |
123 | 58 | }
|
124 | 59 |
|
125 |
| - private ReceiptStatus calculatePoLineReceiptStatus(PoLine poLine, List<Piece> pieces) { |
126 |
| - |
127 |
| - if (CollectionUtils.isEmpty(pieces)) { |
128 |
| - logger.info("No pieces processed - receipt status unchanged for PO Line '{}'", poLine.getId()); |
129 |
| - return poLine.getReceiptStatus(); |
| 60 | + private Future<Void> updatePoLineAndOrderStatuses(List<Piece> pieces, PoLine poLine, RequestContext requestContext) { |
| 61 | + if (PoLineCommonUtil.isCancelledOrOngoingStatus(poLine)) { |
| 62 | + logger.info("updatePoLineAndOrderStatuses:: PoLine with id: '{}' has status: '{}', skipping...", poLine.getId(), poLine.getReceiptStatus()); |
| 63 | + return Future.succeededFuture(); |
130 | 64 | }
|
131 |
| - |
132 |
| - long expectedQty = getPiecesQuantityByPoLineAndStatus(EXPECTED_STATUSES, pieces); |
133 |
| - return calculatePoLineReceiptStatus(expectedQty, pieces); |
134 |
| - } |
135 |
| - |
136 |
| - private ReceiptStatus calculatePoLineReceiptStatus(long expectedPiecesQuantity, List<Piece> pieces) { |
137 |
| - if (expectedPiecesQuantity == 0) { |
138 |
| - logger.info("calculatePoLineReceiptStatus:: Fully received"); |
139 |
| - return FULLY_RECEIVED; |
140 |
| - } |
141 |
| - |
142 |
| - if (StreamEx.of(pieces).anyMatch(piece -> RECEIVED_STATUSES.contains(piece.getReceivingStatus()))) { |
143 |
| - logger.info("calculatePoLineReceiptStatus:: Partially Received - In case there is at least one successfully received piece"); |
144 |
| - return PARTIALLY_RECEIVED; |
| 65 | + var newStatus = pieces.isEmpty() |
| 66 | + ? poLine.getReceiptStatus() |
| 67 | + : calculatePoLineReceiptStatus(poLine.getId(), pieces); |
| 68 | + boolean statusUpdated = purchaseOrderLineService.updatePoLineReceiptStatusWithoutSave(poLine, newStatus); |
| 69 | + if (!statusUpdated) { |
| 70 | + logger.info("updatePoLineAndOrderStatuses:: PoLine receipt status is not updated, skipping..."); |
| 71 | + return Future.succeededFuture(); |
145 | 72 | }
|
146 |
| - |
147 |
| - logger.info("calculatePoLineReceiptStatus::Pieces were rolled-back to Expected, checking if there is any Received piece in the storage"); |
148 |
| - long receivedQty = getPiecesQuantityByPoLineAndStatus(RECEIVED_STATUSES, pieces); |
149 |
| - return receivedQty == 0 ? AWAITING_RECEIPT : PARTIALLY_RECEIVED; |
| 73 | + return purchaseOrderLineService.saveOrderLine(poLine, requestContext) |
| 74 | + .compose(v -> updateOrderStatus(poLine, okapiHeaders, requestContext)) |
| 75 | + .onSuccess(v -> logger.info("updatePoLineAndOrderStatuses:: Order '{}' and PoLine '{}' updated successfully", poLine.getId(), poLine.getPurchaseOrderId())) |
| 76 | + .onFailure(e -> logger.error("Exception occurred while updating Order '{}' and PoLine '{}'", poLine.getId(), poLine.getPurchaseOrderId(), e)); |
150 | 77 | }
|
151 | 78 |
|
152 |
| - private long getPiecesQuantityByPoLineAndStatus(List<ReceivingStatus> receivingStatuses, List<Piece> pieces) { |
153 |
| - return pieces.stream() |
154 |
| - .filter(piece -> receivingStatuses.contains(piece.getReceivingStatus())) |
155 |
| - .count(); |
| 79 | + private Future<Void> updateOrderStatus(PoLine poLine, Map<String, String> okapiHeaders, RequestContext requestContext) { |
| 80 | + var messageContent = JsonObject.of( |
| 81 | + OKAPI_HEADERS, okapiHeaders, |
| 82 | + EVENT_PAYLOAD, JsonArray.of(JsonObject.of(ORDER_ID, poLine.getPurchaseOrderId())) |
| 83 | + ); |
| 84 | + HelperUtils.sendEvent(MessageAddress.RECEIVE_ORDER_STATUS_UPDATE, messageContent, requestContext); |
| 85 | + return Future.succeededFuture(); |
156 | 86 | }
|
157 | 87 |
|
158 |
| - Future<List<Piece>> getPieces(String endpoint, RequestContext requestContext) { |
159 |
| - return new RestClient().get(endpoint, PieceCollection.class, requestContext) |
160 |
| - .map(PieceCollection::getPieces); |
161 |
| - } |
162 | 88 | }
|
0 commit comments