@@ -48,6 +48,8 @@ pub(crate) fn expand_batch_delete(
48
48
client: & self . client,
49
49
write_requests,
50
50
table_name: self . table_name( ) ,
51
+ policy: self . retry_condition. strategy. policy( ) ,
52
+ condition: & self . retry_condition,
51
53
}
52
54
}
53
55
}
@@ -86,6 +88,8 @@ pub(crate) fn expand_batch_delete(
86
88
client: & self . client,
87
89
write_requests,
88
90
table_name: self . table_name( ) ,
91
+ policy: self . retry_condition. strategy. policy( ) ,
92
+ condition: & self . retry_condition,
89
93
}
90
94
}
91
95
}
@@ -95,12 +99,12 @@ pub(crate) fn expand_batch_delete(
95
99
let api_call_token = super :: api_call_token!( "batch_write_item" ) ;
96
100
let ( call_inner_run, inner_run_args) = if cfg ! ( feature = "tracing" ) {
97
101
(
98
- quote ! { #builder_name:: inner_run( & self . table_name, & self . client, input) . await ? } ,
99
- quote ! { table_name: & str , } ,
102
+ quote ! { #builder_name:: inner_run( table_name, client, input) . await } ,
103
+ quote ! { table_name: String , } ,
100
104
)
101
105
} else {
102
106
(
103
- quote ! { #builder_name:: inner_run( & self . client, input) . await ? } ,
107
+ quote ! { #builder_name:: inner_run( client, input) . await } ,
104
108
quote ! { } ,
105
109
)
106
110
} ;
@@ -112,17 +116,22 @@ pub(crate) fn expand_batch_delete(
112
116
pub client: & ' a :: raiden:: DynamoDbClient ,
113
117
pub write_requests: std:: vec:: Vec <:: raiden:: WriteRequest >,
114
118
pub table_name: String ,
119
+ pub policy: :: raiden:: Policy ,
120
+ pub condition: & ' a :: raiden:: retry:: RetryCondition ,
115
121
}
116
122
117
123
impl <' a> #builder_name<' a> {
118
- pub async fn run( mut self ) -> Result <:: raiden:: batch_delete:: BatchDeleteOutput , :: raiden:: RaidenError > {
124
+ pub async fn run( self ) -> Result <:: raiden:: batch_delete:: BatchDeleteOutput , :: raiden:: RaidenError > {
125
+ let Self { client, mut write_requests, table_name, policy, condition } = self ;
126
+ let policy: :: raiden:: RetryPolicy = policy. into( ) ;
127
+
119
128
// TODO: set the number of retry to 5 for now, which should be made more flexible
120
129
const RETRY : usize = 5 ;
121
130
const MAX_ITEMS_PER_REQUEST : usize = 25 ;
122
131
123
132
for _ in 0 ..RETRY {
124
133
loop {
125
- let len = self . write_requests. len( ) ;
134
+ let len = write_requests. len( ) ;
126
135
127
136
// len == 0 means there are no items to be processed anymore
128
137
if len == 0 {
@@ -132,16 +141,24 @@ pub(crate) fn expand_batch_delete(
132
141
let start = len. saturating_sub( MAX_ITEMS_PER_REQUEST ) ;
133
142
let end = std:: cmp:: min( len, start + MAX_ITEMS_PER_REQUEST ) ;
134
143
// take requests up to 25 from the request buffer
135
- let req = self . write_requests. drain( start..end) . collect:: <std:: vec:: Vec <_>>( ) ;
136
- let request_items = vec![ ( self . table_name. clone( ) , req) ]
144
+ let req = write_requests. drain( start..end) . collect:: <std:: vec:: Vec <_>>( ) ;
145
+ let request_items = vec![ ( table_name. clone( ) , req) ]
137
146
. into_iter( )
138
147
. collect:: <std:: collections:: HashMap <_, _>>( ) ;
139
- let input = :: raiden:: BatchWriteItemInput {
140
- request_items,
141
- ..std:: default :: Default :: default ( )
142
- } ;
148
+ let result = {
149
+ let t = table_name. clone( ) ;
150
+ let c = client. clone( ) ;
151
+ let i = :: raiden:: BatchWriteItemInput {
152
+ request_items,
153
+ ..std:: default :: Default :: default ( )
154
+ } ;
143
155
144
- let result = #call_inner_run;
156
+ policy. retry_if( move || {
157
+ let ( table_name, client, input)
158
+ = ( t. clone( ) , c. clone( ) , i. clone( ) ) ;
159
+ async move { #call_inner_run }
160
+ } , condition) . await ?
161
+ } ;
145
162
146
163
let mut unprocessed_items = match result. unprocessed_items {
147
164
None => {
@@ -161,16 +178,16 @@ pub(crate) fn expand_batch_delete(
161
178
} ;
162
179
163
180
let unprocessed_requests = unprocessed_items
164
- . remove( & self . table_name)
165
- . expect( "reqeust_items hashmap must have a value for the table name" ) ;
181
+ . remove( & table_name)
182
+ . expect( "request_items hashmap must have a value for the table name" ) ;
166
183
// push unprocessed requests back to the request buffer
167
- self . write_requests. extend( unprocessed_requests) ;
184
+ write_requests. extend( unprocessed_requests) ;
168
185
}
169
186
}
170
187
171
188
// when retry is done the specified times, treat it as success even if there are
172
189
// still unprocessed items
173
- let unprocessed_items = self . write_requests
190
+ let unprocessed_items = write_requests
174
191
. into_iter( )
175
192
. filter_map( |write_request| write_request. delete_request)
176
193
. collect:: <std:: vec:: Vec <_>>( ) ;
@@ -182,7 +199,7 @@ pub(crate) fn expand_batch_delete(
182
199
183
200
async fn inner_run(
184
201
#inner_run_args
185
- client: & :: raiden:: DynamoDbClient ,
202
+ client: :: raiden:: DynamoDbClient ,
186
203
input: :: raiden:: BatchWriteItemInput ,
187
204
) -> Result <:: raiden:: BatchWriteItemOutput , :: raiden:: RaidenError > {
188
205
Ok ( #api_call_token?)
0 commit comments