libzypp 17.31.7
rangedownloader_p.cc
Go to the documentation of this file.
1/*---------------------------------------------------------------------\
2| ____ _ __ __ ___ |
3| |__ / \ / / . \ . \ |
4| / / \ V /| _/ _/ |
5| / /__ | | | | | | |
6| /_____||_| |_| |_| |
7| |
8----------------------------------------------------------------------*/
9
14#include <zypp-core/AutoDispose.h>
15#include <zypp-core/fs/PathInfo.h>
16
17#include "rangedownloader_p.h"
18
19namespace zyppng {
20
22 { }
23
24 void RangeDownloaderBaseState::onRequestProgress( NetworkRequest &, off_t , off_t, off_t , off_t )
25 {
26 off_t dlnowMulti = _downloadedMultiByteCount;
27 for( const auto &req : _runningRequests ) {
28 dlnowMulti += req->downloadedByteCount();
29 }
30
31 if ( !assertExpectedFilesize( dlnowMulti ) )
32 return;
33
34 stateMachine()._sigProgress.emit( *stateMachine().z_func(), _fileSize, dlnowMulti );
35 }
36
38 {
39 auto lck = stateMachine().z_func()->shared_from_this();
40 auto it = std::find_if( _runningRequests.begin(), _runningRequests.end(), [ &req ]( const std::shared_ptr<Request> &r ) {
41 return ( r.get() == &req );
42 });
43 if ( it == _runningRequests.end() )
44 return;
45
46 auto reqLocked = *it;
47
48 //remove from running
49 _runningRequests.erase( it );
50
51 //feed the working URL back into the mirrors in case there are still running requests that might fail
52 // @TODO , finishing the transfer might never be called in case of cancelling the request, need a better way to track running transfers
53 if ( reqLocked->_myMirror )
54 reqLocked->_myMirror->finishTransfer( !err.isError() );
55
56 if ( err.isError() ) {
57 return handleRequestError( reqLocked, err );
58 }
59
62 return;
63 }
64
65 MIL_MEDIA << "Request finished "<<std::endl;
66 const auto &rngs = reqLocked->requestedRanges();
67 std::for_each( rngs.begin(), rngs.end(), []( const auto &b ){ DBG << "-> Block " << b.start << " finished." << std::endl; } );
68
69 auto restartReqWithBlock = [ this ]( std::shared_ptr<Request> &req, std::vector<Block> &&blocks ) {
70 MIL_MEDIA << "Reusing Request to download blocks:"<<std::endl;
71 if ( !addBlockRanges( req, std::move( blocks ) ) )
72 return false;
73
74 //this is not a new request, only add to queues but do not connect signals again
75 addNewRequest( req, false );
76 return true;
77 };
78
79 //check if we already have enqueued all blocks if not reuse the request
80 if ( _ranges.size() ) {
81 MIL_MEDIA << "Reusing to download blocks: "<<std::endl;
82 if ( !restartReqWithBlock( reqLocked, getNextBlocks( reqLocked->url().getScheme() ) ) ) {
83 return setFailed( "Failed to restart request with new blocks." );
84 }
85 return;
86
87 } else {
88 //if we have failed blocks, try to download them with this mirror
89 if ( !_failedRanges.empty() ) {
90
91 auto fblks = getNextFailedBlocks( reqLocked->url().getScheme() );
92 MIL_MEDIA << "Reusing to download failed blocks: "<<std::endl;
93 if ( !restartReqWithBlock( reqLocked, std::move(fblks) ) ) {
94 return setFailed( "Failed to restart request with previously failed blocks." );
95 }
96 return;
97 }
98 }
99
100 //feed the working URL back into the mirrors in case there are still running requests that might fail
101 _fileMirrors.push_back( reqLocked->_originalUrl );
102
103 // make sure downloads are running, at this point
105 }
106
107 void RangeDownloaderBaseState::handleRequestError( std::shared_ptr<Request> req, const zyppng::NetworkRequestError &err )
108 {
109 bool retry = false;
110 auto &parent = stateMachine();
111
112
113 //Handle the auth errors explicitly, we need to give the user a way to put in new credentials
114 //if we get valid new credentials we can retry the request
116 retry = parent.handleRequestAuthError( req, err );
117 } else {
118
119 //if a error happens during a multi download we try to use another mirror to download the failed block
120 MIL << "Request failed " << req->extendedErrorString() << "(" << req->url() << ")" << std::endl;
121
122 NetworkRequestError dummyErr;
123
124 const auto &fRanges = req->failedRanges();
125 try {
126 std::transform( fRanges.begin(), fRanges.end(), std::back_inserter(_failedRanges), [ &req ]( const auto &r ){
127 Block b = std::any_cast<Block>(r.userData);;
128 b._failedWithErr = req->error();
129 if ( zypp::env::ZYPP_MEDIA_CURL_DEBUG() > 3 )
130 DBG_MEDIA << "Adding failed block to failed blocklist: " << b.start << " " << b.len << " (" << req->error().toString() << " [" << req->error().nativeErrorString()<< "])" << std::endl;
131 return b;
132 });
133
134 // try to fill the open spot right away
136 return;
137
138 } catch ( const zypp::Exception &ex ) {
139 //we just log the exception and fall back to a normal download
140 WAR << "Multipart download failed: " << ex.asString() << std::endl;
141 }
142 }
143
144 //if rety is true we just enqueue the request again, usually this means authentication was updated
145 if ( retry ) {
146 //make sure this request will run asap
147 req->setPriority( parent._defaultSubRequestPriority );
148
149 //this is not a new request, only add to queues but do not connect signals again
150 addNewRequest( req, false );
151 return;
152 }
153
154 //we do not have more mirrors left we can try
155 cancelAll ( err );
156
157 // not all hope is lost, maybe a normal download can work out?
158 // fall back to normal download
159 _sigFailed.emit();
160 }
161
163 {
165 return;
166
167 zypp::OnScopeExit clearFlag( [this]() {
169 });
170
172
173 //check if there is still work to do
174 while ( _ranges.size() || _failedRanges.size() ) {
175
176 // download was already finished
177 if ( _error.isError() )
178 return;
179
180 if ( _runningRequests.size() >= 10 )
181 break;
182
183 // prepareNextMirror will automatically call mirrorReceived() once there is a mirror ready
184 const auto &res = prepareNextMirror();
185 // if mirrors are delayed we stop here, once the mirrors are ready we get called again
187 return;
188 else if ( res == MirrorHandlingStateBase::Failed ) {
190 return;
191 }
192 }
193
194 // check if we are done at this point
195 if ( _runningRequests.empty() ) {
196
197 if ( _failedRanges.size() || _ranges.size() ) {
199 return;
200 }
201
202 // seems we were successfull , transition to finished state
203 setFinished();
204 }
205 }
206
208 {
209
210 auto &parent = stateMachine();
211 Url myUrl;
212 TransferSettings settings;
213
214 auto err = setupMirror( mirror, myUrl, settings );
215 if ( err.isError() ) {
216 WAR << "Failure to setup mirror " << myUrl << " with error " << err.toString() << "("<< err.nativeErrorString() << "), dropping it from the list of mirrors." << std::endl;
217 // if a mirror fails , we remove it from our list
218 _fileMirrors.erase( mirror.first );
219
220 // make sure this is retried
222 return;
223 }
224
225 auto blocks = getNextBlocks( myUrl.getScheme() );
226 if ( !blocks.size() )
227 blocks = getNextFailedBlocks( myUrl.getScheme() );
228
229 if ( !blocks.size() ) {
230 // We have no blocks. In theory, that should never happen, but for safety, we error out here. It is better than
231 // getting stuck.
232 setFailed( NetworkRequestErrorPrivate::customError( NetworkRequestError::InternalError, "Mirror requested after all blocks were downloaded." ) );
233 return;
234 }
235
236 const auto &spec = parent._spec;
237
238 std::shared_ptr<Request> req = std::make_shared<Request>( ::internal::clearQueryString( myUrl ), spec.targetPath(), NetworkRequest::WriteShared );
239 req->_myMirror = mirror.second;
240 req->_originalUrl = myUrl;
241 req->setPriority( parent._defaultSubRequestPriority );
242 req->transferSettings() = settings;
243
244 // if we download chunks we do not want to wait for too long on mirrors that have slow activity
245 // note: this sets the activity timeout, not the download timeout
246 req->transferSettings().setTimeout( 2 );
247
248 DBG_MEDIA << "Creating Request to download blocks:"<<std::endl;
249 if ( !addBlockRanges( req, std::move(blocks) ) ) {
251 return;
252 }
253
254 // we just use a mirror once per file, remove it from the list
255 _fileMirrors.erase( mirror.first );
256
257 addNewRequest( req );
258
259 // trigger next downloads
261 }
262
264 {
265 // it was impossible to find a new mirror, check if we still have running requests we can wait for, if not
266 // we can only fail at this point
267 if ( !_runningRequests.size() ) {
269 }
270 }
271
273 {
274 bool triggerResched = false;
275 for ( auto &req : _runningRequests ) {
276 if ( req->state() == NetworkRequest::Pending ) {
277 triggerResched = true;
278 req->setPriority( NetworkRequest::Critical, false );
279 }
280 }
281 if ( triggerResched )
282 stateMachine()._requestDispatcher->reschedule();
283 }
284
285 void RangeDownloaderBaseState::addNewRequest(std::shared_ptr<Request> req , const bool connectSignals)
286 {
287 if ( connectSignals )
288 req->connectSignals( *this );
289
290 _runningRequests.push_back( req );
291 stateMachine()._requestDispatcher->enqueue( req );
292
293 if ( req->_myMirror )
294 req->_myMirror->startTransfer();
295 }
296
298 {
299 const off_t expFSize = stateMachine()._spec.expectedFileSize();
300 if ( expFSize > 0 && expFSize < currentFilesize ) {
302 return false;
303 }
304 return true;
305 }
306
310 bool RangeDownloaderBaseState::addBlockRanges ( std::shared_ptr<Request> req , std::vector<Block> &&blocks ) const
311 {
312 req->resetRequestRanges();
313 for ( const auto &block : blocks ) {
314 if ( block.chksumVec && block.chksumtype.size() ) {
315 std::shared_ptr<zypp::Digest> dig = std::make_shared<zypp::Digest>();
316 if ( !dig->create( block.chksumtype ) ) {
317 WAR_MEDIA << "Trying to create Digest with chksum type " << block.chksumtype << " failed " << std::endl;
318 return false;
319 }
320
322 DBG_MEDIA << "Starting block " << block.start << " with checksum " << zypp::Digest::digestVectorToString( *block.chksumVec ) << "." << std::endl;
323 req->addRequestRange( block.start, block.len, dig, *block.chksumVec, std::any( block ), block.chksumCompareLen, block.chksumPad );
324 } else {
325
327 DBG_MEDIA << "Starting block " << block.start << " without checksum." << std::endl;
328 req->addRequestRange( block.start, block.len, {}, {}, std::any( block ) );
329 }
330 }
331 return true;
332 }
333
335 {
336 _error = std::move( err );
337 cancelAll( _error );
338 zypp::filesystem::unlink( stateMachine()._spec.targetPath() );
339 _sigFailed.emit();
340 }
341
342 void RangeDownloaderBaseState::setFailed(std::string &&reason)
343 {
345 }
346
348 {
350 _sigFinished.emit();
351 }
352
354 {
355 while( _runningRequests.size() ) {
356 auto req = _runningRequests.back();
357 req->disconnectSignals();
358 _runningRequests.pop_back();
359 stateMachine()._requestDispatcher->cancel( *req, err );
360 if ( req->_myMirror )
361 req->_myMirror->cancelTransfer();
362 }
363 }
364
365 std::vector<RangeDownloaderBaseState::Block> RangeDownloaderBaseState::getNextBlocks( const std::string &urlScheme )
366 {
367 std::vector<Block> blocks;
368 const auto &prefSize = std::max<zypp::ByteCount>( _preferredChunkSize, zypp::ByteCount(4, zypp::ByteCount::K) );
369 size_t accumulatedSize = 0;
370
371 bool canDoRandomBlocks = ( zypp::str::hasPrefixCI( urlScheme, "http") );
372
373 std::optional<size_t> lastBlockEnd;
374 while ( _ranges.size() && accumulatedSize < prefSize ) {
375 const auto &r = _ranges.front();
376
377 if ( !canDoRandomBlocks && lastBlockEnd ) {
378 if ( static_cast<const size_t>(r.start) != (*lastBlockEnd)+1 )
379 break;
380 }
381
382 lastBlockEnd = r.start + r.len - 1;
383 accumulatedSize += r.len;
384
385 blocks.push_back( std::move( _ranges.front() ) );
386 _ranges.pop_front();
387
388 }
389 DBG_MEDIA << "Accumulated " << blocks.size() << " blocks with accumulated size of: " << accumulatedSize << "." << std::endl;
390 return blocks;
391 }
392
393 std::vector<RangeDownloaderBaseState::Block> RangeDownloaderBaseState::getNextFailedBlocks( const std::string &urlScheme )
394 {
395 const auto &prefSize = std::max<zypp::ByteCount>( _preferredChunkSize, zypp::ByteCount(4, zypp::ByteCount::K) );
396 // sort the failed requests by block number, this should make sure get them in offset order as well
397 _failedRanges.sort( []( const auto &a , const auto &b ){ return a.start < b.start; } );
398
399 bool canDoRandomBlocks = ( zypp::str::hasPrefixCI( urlScheme, "http") );
400
401 std::vector<Block> fblks;
402 std::optional<size_t> lastBlockEnd;
403 size_t accumulatedSize = 0;
404 while ( _failedRanges.size() ) {
405
406 const auto &block =_failedRanges.front();
407
408 //we need to check if we have consecutive blocks because only http mirrors support random request ranges
409 if ( !canDoRandomBlocks && lastBlockEnd ) {
410 if ( static_cast<const size_t>(block.start) != (*lastBlockEnd)+1 )
411 break;
412 }
413
414 lastBlockEnd = block.start + block.len - 1;
415 accumulatedSize += block.len;
416
417 fblks.push_back( std::move( _failedRanges.front() ));
418 _failedRanges.pop_front();
419
420 fblks.back()._retryCount += 1;
421
422 if ( accumulatedSize >= prefSize )
423 break;
424 }
425
426 return fblks;
427 }
428
430 {
431 // this case should never happen because we never start a multi download if we do not know the filesize beforehand
432 if ( filesize == 0 ) return 4 * 1024 * 1024;
433 else if ( filesize < 4*1024*1024 ) return filesize;
434 else if ( filesize < 8*1024*1024 ) return 4*1024*1024;
435 else if ( filesize < 16*1024*1024 ) return 8*1024*1024;
436 else if ( filesize < 256*1024*1024 ) return 10*1024*1024;
437 return 4*1024*1024;
438 }
439}
Store and operate with byte count.
Definition: ByteCount.h:31
static const Unit K
1024 Byte
Definition: ByteCount.h:45
static std::string digestVectorToString(const UByteArray &vec)
get hex string representation of the digest vector given as parameter
Definition: Digest.cc:184
Base class for Exception.
Definition: Exception.h:146
std::string asString() const
Error message provided by dumpOn as string.
Definition: Exception.cc:75
Holds transfer setting.
void setTimeout(long t)
set the transfer timeout
std::pair< std::vector< Url >::const_iterator, MirrorHandle > MirrorPick
static zyppng::NetworkRequestError customError(NetworkRequestError::Type t, std::string &&errorMsg="", std::map< std::string, boost::any > &&extraInfo={})
The NetworkRequestError class Represents a error that occured in.
Type type() const
type Returns the type of the error
bool isError() const
isError Will return true if this is a actual error
zypp::ByteCount downloadedByteCount() const
Returns the number of already downloaded bytes as reported by the backend.
Definition: request.cc:1402
unsigned short a
unsigned short b
#define MIL_MEDIA
Definition: mediadebug_p.h:29
#define WAR_MEDIA
Definition: mediadebug_p.h:30
#define DBG_MEDIA
Definition: mediadebug_p.h:28
Url clearQueryString(const Url &url)
Definition: curlhelper.cc:332
long ZYPP_MEDIA_CURL_DEBUG()
Long number for setting CURLOPT_DEBUGDATA.
Definition: curlhelper_p.h:35
int unlink(const Pathname &path)
Like 'unlink'.
Definition: PathInfo.cc:700
bool hasPrefixCI(const C_Str &str_r, const C_Str &prefix_r)
Definition: String.h:1030
NetworkRequestError setupMirror(const MirrorControl::MirrorPick &pick, Url &url, TransferSettings &set)
std::vector< Block > getNextBlocks(const std::string &urlScheme)
void onRequestProgress(NetworkRequest &, off_t, off_t, off_t, off_t)
bool addBlockRanges(std::shared_ptr< Request > req, std::vector< Block > &&blocks) const
Just initialize the requests ranges from the internal blocklist.
void onRequestFinished(NetworkRequest &req, const NetworkRequestError &err)
void onRequestStarted(NetworkRequest &)
void handleRequestError(std::shared_ptr< Request > req, const zyppng::NetworkRequestError &err)
std::vector< std::shared_ptr< Request > > _runningRequests
bool assertExpectedFilesize(off_t currentFilesize)
std::vector< Block > getNextFailedBlocks(const std::string &urlScheme)
static zypp::ByteCount makeBlksize(size_t filesize)
void mirrorReceived(MirrorControl::MirrorPick mirror) override
void cancelAll(const NetworkRequestError &err)
void addNewRequest(std::shared_ptr< Request > req, const bool connectSignals=true)
void setFailed(NetworkRequestError &&err)
#define MIL
Definition: Logger.h:96
#define WAR
Definition: Logger.h:97