@@ -8,6 +8,7 @@ use std::thread;
8
8
use std:: { sync:: Arc , task:: Poll } ;
9
9
10
10
use futures:: { future:: BoxFuture , AsyncWrite , FutureExt } ;
11
+ use masker:: { ChunkMasker , Masker } ;
11
12
use reqwest:: Body ;
12
13
use tokio:: fs:: File as AsyncFile ;
13
14
use tokio:: sync:: mpsc:: { self , error:: SendError } ;
@@ -71,6 +72,7 @@ fn zip_thread(mut temp: File, mut rx: mpsc::Receiver<UploadRequest>) {
71
72
pub struct UploadFile < ' a > {
72
73
tx : & ' a mpsc:: Sender < UploadRequest > ,
73
74
state : UploadFileState < ' a > ,
75
+ masker : Option < ChunkMasker < ' a > > ,
74
76
}
75
77
76
78
impl < ' a > AsyncWrite for UploadFile < ' a > {
@@ -84,10 +86,12 @@ impl<'a> AsyncWrite for UploadFile<'a> {
84
86
match this. state {
85
87
UploadFileState :: Idle => {
86
88
let ( tx, rx) = oneshot:: channel ( ) ;
87
- let send = this
88
- . tx
89
- . send ( UploadRequest :: WriteData ( Vec :: from ( buf) , tx) )
90
- . boxed ( ) ;
89
+ let buf = if let Some ( masker) = & mut this. masker {
90
+ masker. mask_chunk ( buf)
91
+ } else {
92
+ Vec :: from ( buf)
93
+ } ;
94
+ let send = this. tx . send ( UploadRequest :: WriteData ( buf, tx) ) . boxed ( ) ;
91
95
this. state = UploadFileState :: Writing ( Some ( send) , rx)
92
96
}
93
97
UploadFileState :: Writing ( ref mut send, ref mut rx) => {
@@ -114,9 +118,33 @@ impl<'a> AsyncWrite for UploadFile<'a> {
114
118
115
119
fn poll_close (
116
120
self : std:: pin:: Pin < & mut Self > ,
117
- _cx : & mut std:: task:: Context < ' _ > ,
121
+ cx : & mut std:: task:: Context < ' _ > ,
118
122
) -> std:: task:: Poll < std:: io:: Result < ( ) > > {
119
- Poll :: Ready ( Ok ( ( ) ) )
123
+ let this = self . get_mut ( ) ;
124
+ if let Some ( masker) = this. masker . take ( ) {
125
+ let ( tx, mut rx) = oneshot:: channel ( ) ;
126
+ let buf = masker. finish ( ) ;
127
+ let mut send = Some ( this. tx . send ( UploadRequest :: WriteData ( buf, tx) ) . boxed ( ) ) ;
128
+
129
+ loop {
130
+ if let Some ( mut f) = send {
131
+ // Phase 1: Waiting for the send to the writer
132
+ // thread to complete.
133
+
134
+ // TODO error handling
135
+ let _r = futures:: ready!( f. as_mut( ) . poll( cx) ) ;
136
+ send = None ;
137
+ } else {
138
+ // Phase 2: Waiting for the writer thread to
139
+ // signal write completion.
140
+
141
+ let _r = futures:: ready!( Pin :: new( & mut rx) . poll( cx) ) ;
142
+ return Poll :: Ready ( Ok ( ( ) ) ) ;
143
+ }
144
+ }
145
+ } else {
146
+ Poll :: Ready ( Ok ( ( ) ) )
147
+ }
120
148
}
121
149
}
122
150
@@ -125,20 +153,27 @@ pub struct Uploader {
125
153
client : Client ,
126
154
data : Arc < JobResponse > ,
127
155
tx : mpsc:: Sender < UploadRequest > ,
156
+ masker : Masker ,
128
157
}
129
158
130
159
impl Uploader {
131
160
pub ( crate ) fn new (
132
161
client : Client ,
133
162
build_dir : & Path ,
134
163
data : Arc < JobResponse > ,
164
+ masker : Masker ,
135
165
) -> Result < Self , ( ) > {
136
166
let temp = tempfile:: tempfile_in ( build_dir)
137
167
. map_err ( |e| warn ! ( "Failed to create artifacts temp file: {:?}" , e) ) ?;
138
168
139
169
let ( tx, rx) = mpsc:: channel ( 2 ) ;
140
170
thread:: spawn ( move || zip_thread ( temp, rx) ) ;
141
- Ok ( Self { client, data, tx } )
171
+ Ok ( Self {
172
+ client,
173
+ data,
174
+ tx,
175
+ masker,
176
+ } )
142
177
}
143
178
144
179
/// Create a new file to be uploaded
@@ -149,6 +184,20 @@ impl Uploader {
149
184
. expect ( "Failed to create file" ) ;
150
185
UploadFile {
151
186
tx : & self . tx ,
187
+ masker : None ,
188
+ state : UploadFileState :: Idle ,
189
+ }
190
+ }
191
+
192
+ /// Create a new file to be uploaded, which will be masked
193
+ pub async fn masked_file ( & mut self , name : String ) -> UploadFile < ' _ > {
194
+ self . tx
195
+ . send ( UploadRequest :: NewFile ( name) )
196
+ . await
197
+ . expect ( "Failed to create file" ) ;
198
+ UploadFile {
199
+ tx : & self . tx ,
200
+ masker : Some ( self . masker . mask_chunks ( ) ) ,
152
201
state : UploadFileState :: Idle ,
153
202
}
154
203
}
0 commit comments