libzypp 17.35.13
MediaMultiCurl.cc
Go to the documentation of this file.
1/*---------------------------------------------------------------------\
2| ____ _ __ __ ___ |
3| |__ / \ / / . \ . \ |
4| / / \ V /| _/ _/ |
5| / /__ | | | | | | |
6| /_____||_| |_| |_| |
7| |
8\---------------------------------------------------------------------*/
13#include <ctype.h>
14#include <sys/types.h>
15#include <signal.h>
16#include <sys/wait.h>
17#include <netdb.h>
18#include <arpa/inet.h>
19#include <glib.h>
20
21#include <utility>
22#include <vector>
23#include <iostream>
24#include <algorithm>
25
26
27#include <zypp/ManagedFile.h>
28#include <zypp/ZConfig.h>
29#include <zypp/base/Logger.h>
33#include <zypp-curl/parser/MetaLinkParser>
36#include <zypp-curl/auth/CurlAuthData>
39
40using std::endl;
41using namespace zypp::base;
42
43#undef CURLVERSION_AT_LEAST
44#define CURLVERSION_AT_LEAST(M,N,O) LIBCURL_VERSION_NUM >= ((((M)<<8)+(N))<<8)+(O)
45
46namespace zypp {
47 namespace media {
48
50
51
115
116struct Stripe {
117
118 enum RState {
119 PENDING, //< Pending Range
120 FETCH, //< Fetch is running!
121 COMPETING, //< Competing workers, needs checksum recheck
122 FINALIZED, //< Done, don't write to it anymore
123 REFETCH //< This block needs a refetch
124 };
125
126 std::vector<off_t> blocks; //< required block numbers from blocklist
127 std::vector<RState> blockStates; //< current state of each block in blocks
128};
129
139
140// Hack: we derive from MediaCurl just to get the storage space for
141// settings, url, curlerrors and the like
143 friend class multifetchrequest;
144
145public:
146 multifetchworker(int no, multifetchrequest &request, const Url &url);
151 ~multifetchworker() override;
152
157 void nextjob();
158
163 void runjob();
164
170 bool continueJob();
171
176 bool recheckChecksum( off_t blockIdx );
177
181 void disableCompetition();
182
186 void checkdns();
187 void adddnsfd( std::vector<GPollFD> &waitFds );
188 void dnsevent(const std::vector<GPollFD> &waitFds );
189
190 const int _workerno;
191
193 bool _competing = false;
194
195 std::vector<MultiByteHandler::Range> _blocks;
196 std::vector<off_t> _rangeToStripeBlock;
197
199 std::unique_ptr<MultiByteHandler> _multiByteHandler;
200
201 off_t _stripe = 0; //< The current stripe we are downloading
202 size_t _datasize = 0; //< The nr of bytes we need to download overall
203
204 double _starttime = 0; //< When was the current job started
205 size_t _datareceived = 0; //< Data downloaded in the current job only
206 off_t _received = 0; //< Overall data"MultiByteHandler::prepare failed" fetched by this worker
207
208 double _avgspeed = 0;
209 double _maxspeed = 0;
210
211 double _sleepuntil = 0;
212
213private:
214 void run();
215 void stealjob();
216 bool setupHandle();
217 MultiByteHandler::Range rangeFromBlock( off_t blockNo ) const;
218
219 size_t writefunction ( char *ptr, std::optional<off_t> offset, size_t bytes ) override;
220 size_t headerfunction ( char *ptr, size_t bytes ) override;
221 bool beginRange ( off_t range, std::string &cancelReason ) override;
222 bool finishedRange ( off_t range, bool validated, std::string &cancelReason ) override;
223
225 int _pass = 0;
226 std::string _urlbuf;
227
228 pid_t _pid = 0;
229 int _dnspipe = -1;
230};
231
233public:
234 multifetchrequest(const MediaMultiCurl *context, Pathname filename,
235 Url baseurl, CURLM *multi, FILE *fp,
237 MediaBlockList &&blklist, off_t filesize);
243
244 void run(std::vector<Url> &urllist);
245 static ByteCount makeBlksize( uint maxConns, size_t filesize );
246
248 return _blklist;
249 }
250
251protected:
252 friend class multifetchworker;
253
257
258 FILE *_fp = nullptr;
261
262 std::vector<Stripe> _requiredStripes; // all the data we need
263
264 off_t _filesize = 0; //< size of the file we want to download
265
266 std::list< std::unique_ptr<multifetchworker> > _workers;
267 bool _stealing = false;
268 bool _havenewjob = false;
269
270 zypp::ByteCount _defaultBlksize = 0; //< The blocksize to use if the metalink file does not specify one
271 off_t _stripeNo = 0; //< next stripe to download
272
273 size_t _activeworkers = 0;
274 size_t _lookupworkers = 0;
275 size_t _sleepworkers = 0;
276 double _minsleepuntil = 0;
277 bool _finished = false;
278
279 off_t _totalsize = 0; //< nr of bytes we need to download ( e.g. filesize - ( bytes reused from deltafile ) )
280 off_t _fetchedsize = 0;
282
283 double _starttime = 0;
284 double _lastprogress = 0;
285
288 double _periodavg = 0;
289
290public:
291 double _timeout = 0;
293 double _maxspeed = 0;
294 int _maxworkers = 0;
295};
296
297constexpr auto MIN_REQ_MIRRS = 4;
298constexpr auto MAXURLS = 10;
299
300// TCP communication scales up as a connection proceeds. This is due to TCP slowstart where
301// the congestion window scales up. The stripe calculation assumes that every package can be fairly
302// downloaded from multiple mirrors omits that attempting to download say 1MB from 4 mirrors
303// means 4 requests of 256k, where then you have four congestion windows that need to increase
304// meaning the overall download speed is significantly lower. Counter intuitively this leads to
305// cases where *more* mirrors being available to zypper significantly lowers performance.
306//
307// Instead, there should be a minimum stripe size cap. This way any item smaller than the value
308// is downloaded in a single request, where as larger items are downloaded from many mirrors
309// but each range has enough time to increase it's congestion window to something reasonable.
310//
311// Initial value 4 MB;
312constexpr auto MIN_STRIPE_SIZE_KB = 4096;
313
315
316static double
318{
319#if _POSIX_C_SOURCE >= 199309L
320 struct timespec ts;
321 if ( clock_gettime( CLOCK_MONOTONIC, &ts) )
322 return 0;
323 return ts.tv_sec + ts.tv_nsec / 1000000000.;
324#else
325 struct timeval tv;
326 if (gettimeofday(&tv, NULL))
327 return 0;
328 return tv.tv_sec + tv.tv_usec / 1000000.;
329#endif
330}
331
332size_t
333multifetchworker::writefunction(char *ptr, std::optional<off_t> offset, size_t bytes)
334{
336 return bytes ? 0 : 1;
337
338 double now = currentTime();
339
340 // update stats of overall data
341 _datareceived += bytes;
342 _received += bytes;
343 _request->_lastprogress = now;
344
345 const auto &currRange = _multiByteHandler->currentRange();
346 if (!currRange)
347 return 0; // we always write to a range
348
349 auto &stripeDesc = _request->_requiredStripes[_stripe];
350 if ( !_request->_fp || stripeDesc.blockStates[ _rangeToStripeBlock[*currRange] ] == Stripe::FINALIZED ) {
351 // someone else finished our block first!
352 // we stop here and fetch new jobs if there are still some
354 _competing = false;
355 return 0;
356 }
357
358 const auto &blk = _blocks[*currRange];
359 off_t seekTo = blk.start + blk.bytesWritten;
360
361 if ( ftell( _request->_fp ) != seekTo ) {
362 // if we can't seek the file there is no purpose in trying again
363 if (fseeko(_request->_fp, seekTo, SEEK_SET))
364 return bytes ? 0 : 1;
365 }
366
367 size_t cnt = fwrite(ptr, 1, bytes, _request->_fp);
368 _request->_fetchedsize += cnt;
369 return cnt;
370}
371
372bool multifetchworker::beginRange ( off_t workerRangeOff, std::string &cancelReason )
373{
374 auto &stripeDesc = _request->_requiredStripes[_stripe];
375 auto stripeRangeOff = _rangeToStripeBlock[workerRangeOff];
376 const auto &currRangeState = stripeDesc.blockStates[stripeRangeOff];
377
378 if ( currRangeState == Stripe::FINALIZED ){
379 cancelReason = "Cancelled because stripe block is already finalized";
381 WAR << "#" << _workerno << ": trying to start a range ("<<stripeRangeOff<<"["<< _blocks[workerRangeOff].start <<" : "<<_blocks[workerRangeOff].len<<"]) that was already finalized, cancelling. Stealing was: " << _request->_stealing << endl;
382 return false;
383 }
384 stripeDesc.blockStates[stripeRangeOff] = currRangeState == Stripe::PENDING ? Stripe::FETCH : Stripe::COMPETING;
385 return true;
386}
387
388bool multifetchworker::finishedRange ( off_t workerRangeOff, bool validated, std::string &cancelReason )
389{
390 auto &stripeDesc = _request->_requiredStripes[_stripe];
391 auto stripeRangeOff = _rangeToStripeBlock[workerRangeOff];
392 const auto &currRangeState = stripeDesc.blockStates[stripeRangeOff];
393
394 if ( !validated ) {
395 // fail, worker will go into WORKER_BROKEN
396 cancelReason = "Block failed to validate";
397 return false;
398 }
399
400 if ( currRangeState == Stripe::FETCH ) {
401 // only us who wrote here, block is finalized
402 stripeDesc.blockStates[stripeRangeOff] = Stripe::FINALIZED;
403 _request->_fetchedgoodsize += _blocks[workerRangeOff].len;
404 } else {
405 // others wrote here, we need to check the full checksum
406 if ( recheckChecksum ( workerRangeOff ) ) {
407 stripeDesc.blockStates[stripeRangeOff] = Stripe::FINALIZED;
408 _request->_fetchedgoodsize += _blocks[workerRangeOff].len;
409 } else {
410 // someone messed that block up, set it to refetch but continue since our
411 // data is valid
412 WAR << "#" << _workerno << ": Broken data in COMPETING block, requesting refetch. Stealing is: " << _request->_stealing << endl;
413 stripeDesc.blockStates[stripeRangeOff] = Stripe::REFETCH;
414 }
415 }
416 return true;
417}
418
419size_t
420multifetchworker::headerfunction( char *p, size_t bytes )
421{
422 size_t l = bytes;
423 if (l > 9 && !strncasecmp(p, "Location:", 9)) {
424 std::string line(p + 9, l - 9);
425 if (line[l - 10] == '\r')
426 line.erase(l - 10, 1);
427 XXX << "#" << _workerno << ": redirecting to" << line << endl;
428 return bytes;
429 }
430
431 const auto &repSize = _multiByteHandler->reportedFileSize ();
432 if ( repSize && *repSize != _request->_filesize ) {
433 XXX << "#" << _workerno << ": filesize mismatch" << endl;
435 setCurlError("filesize mismatch");
436 return 0;
437 }
438
439 return bytes;
440}
441
443: MediaCurl(url, Pathname())
444, _workerno( no )
445, _maxspeed( request._maxspeed )
446, _request ( &request )
447{
448 Url curlUrl( clearQueryString(url) );
449 _urlbuf = curlUrl.asString();
451 if (_curl)
452 XXX << "reused worker from pool" << endl;
453 if (!_curl && !(_curl = curl_easy_init()))
454 {
456 setCurlError("curl_easy_init failed");
457 return;
458 }
459
460 if ( url.getScheme() == "http" || url.getScheme() == "https" )
462
463 setupHandle();
464 checkdns();
465}
466
468{
469 try {
470 setupEasy();
471 } catch (Exception &ex) {
472 curl_easy_cleanup(_curl);
473 _curl = 0;
475 setCurlError("curl_easy_setopt failed");
476 return false;
477 }
478 curl_easy_setopt(_curl, CURLOPT_PRIVATE, this);
479 curl_easy_setopt(_curl, CURLOPT_URL, _urlbuf.c_str());
480
481 // if this is the same host copy authorization
482 // (the host check is also what curl does when doing a redirect)
483 // (note also that unauthorized exceptions are thrown with the request host)
484 if ( _url.getHost() == _request->_context->_url.getHost()) {
488 if ( _settings.userPassword().size() ) {
489 curl_easy_setopt(_curl, CURLOPT_USERPWD, _settings.userPassword().c_str());
490 std::string use_auth = _settings.authType();
491 if (use_auth.empty())
492 use_auth = "digest,basic"; // our default
493 long auth = CurlAuthData::auth_type_str2long(use_auth);
494 if( auth != CURLAUTH_NONE)
495 {
496 XXX << "#" << _workerno << ": Enabling HTTP authentication methods: " << use_auth
497 << " (CURLOPT_HTTPAUTH=" << auth << ")" << std::endl;
498 curl_easy_setopt(_curl, CURLOPT_HTTPAUTH, auth);
499 }
500 }
501 }
502 return true;
503}
504
506{
507 if (_curl)
508 {
510 curl_multi_remove_handle(_request->_multi, _curl);
512 {
513#if CURLVERSION_AT_LEAST(7,15,5)
514 curl_easy_setopt(_curl, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t)0);
515#endif
516 curl_easy_setopt(_curl, CURLOPT_PRIVATE, (void *)0);
517 curl_easy_setopt(_curl, CURLOPT_WRITEFUNCTION, (void *)0);
518 curl_easy_setopt(_curl, CURLOPT_WRITEDATA, (void *)0);
519 curl_easy_setopt(_curl, CURLOPT_HEADERFUNCTION, (void *)0);
520 curl_easy_setopt(_curl, CURLOPT_HEADERDATA, (void *)0);
522 }
523 else
524 curl_easy_cleanup(_curl);
525 _curl = 0;
526 }
527 if (_pid)
528 {
529 kill(_pid, SIGKILL);
530 int status = 0;
531 zyppng::eintrSafeCall( waitpid, _pid, &status, 0);
532 _pid = 0;
533 }
534 if (_dnspipe != -1)
535 {
536 close(_dnspipe);
537 _dnspipe = -1;
538 }
539 // the destructor in MediaCurl doesn't call disconnect() if
540 // the media is not attached, so we do it here manually
542}
543
544static inline bool env_isset(const std::string& name)
545{
546 const char *s = getenv(name.c_str());
547 return s && *s ? true : false;
548}
549
550void
552{
553 std::string host = _url.getHost();
554
555 if (host.empty())
556 return;
557
558 if (_request->_context->isDNSok(host))
559 return;
560
561 // no need to do dns checking for numeric hosts
562 char addrbuf[128];
563 if (inet_pton(AF_INET, host.c_str(), addrbuf) == 1)
564 return;
565 if (inet_pton(AF_INET6, host.c_str(), addrbuf) == 1)
566 return;
567
568 // no need to do dns checking if we use a proxy
569 if (!_settings.proxy().empty())
570 return;
571 if (env_isset("all_proxy") || env_isset("ALL_PROXY"))
572 return;
573 std::string schemeproxy = _url.getScheme() + "_proxy";
574 if (env_isset(schemeproxy))
575 return;
576 if (schemeproxy != "http_proxy")
577 {
578 std::transform(schemeproxy.begin(), schemeproxy.end(), schemeproxy.begin(), ::toupper);
579 if (env_isset(schemeproxy))
580 return;
581 }
582
583 XXX << "checking DNS lookup of " << host << endl;
584 int pipefds[2];
585 if (pipe(pipefds))
586 {
588 setCurlError("DNS pipe creation failed");
589 return;
590 }
591 _pid = fork();
592 if (_pid == pid_t(-1))
593 {
594 close(pipefds[0]);
595 close(pipefds[1]);
596 _pid = 0;
598 setCurlError("DNS checker fork failed");
599 return;
600 }
601 else if (_pid == 0)
602 {
603 close(pipefds[0]);
604 // XXX: close all other file descriptors
605 struct addrinfo *ai = nullptr, aihints;
606 memset(&aihints, 0, sizeof(aihints));
607 aihints.ai_family = PF_UNSPEC;
608 int tstsock = socket(PF_INET6, SOCK_DGRAM | SOCK_CLOEXEC, 0);
609 if (tstsock == -1)
610 aihints.ai_family = PF_INET;
611 else
612 close(tstsock);
613 aihints.ai_socktype = SOCK_STREAM;
614 aihints.ai_flags = AI_CANONNAME;
615 unsigned int connecttimeout = _request->_connect_timeout;
616 if (connecttimeout)
617 alarm(connecttimeout);
618 signal(SIGALRM, SIG_DFL);
619 if (getaddrinfo(host.c_str(), NULL, &aihints, &ai))
620 _exit(1);
621 _exit(0);
622 }
623 close(pipefds[1]);
624 _dnspipe = pipefds[0];
626}
627
628void
629multifetchworker::adddnsfd(std::vector<GPollFD> &waitFds)
630{
631 if (_state != WORKER_LOOKUP)
632 return;
633
634 waitFds.push_back (
635 GPollFD {
636 .fd = _dnspipe,
637 .events = G_IO_IN | G_IO_HUP | G_IO_ERR,
638 .revents = 0
639 });
640}
641
642void
643multifetchworker::dnsevent( const std::vector<GPollFD> &waitFds )
644{
645 bool hasEvent = std::any_of( waitFds.begin (), waitFds.end(),[this]( const GPollFD &waitfd ){
646 return ( waitfd.fd == _dnspipe && waitfd.revents != 0 );
647 });
648
649 if (_state != WORKER_LOOKUP || !hasEvent)
650 return;
651 int status = 0;
652 zyppng::eintrSafeCall( waitpid, _pid, &status, 0);
653 _pid = 0;
654 if (_dnspipe != -1)
655 {
656 close(_dnspipe);
657 _dnspipe = -1;
658 }
659 if (!WIFEXITED(status))
660 {
662 setCurlError("DNS lookup failed");
664 return;
665 }
666 int exitcode = WEXITSTATUS(status);
667 XXX << "#" << _workerno << ": DNS lookup returned " << exitcode << endl;
668 if (exitcode != 0)
669 {
671 setCurlError("DNS lookup failed");
673 return;
674 }
676 nextjob();
677}
678
679bool multifetchworker::recheckChecksum( off_t workerRangeIdx )
680{
681 // XXX << "recheckChecksum block " << _blkno << endl;
682 if (!_request->_fp || !_datasize || !_blocks.size() )
683 return true;
684
685 auto &blk = _blocks[workerRangeIdx];
686 if ( !blk._digest )
687 return true;
688
689 const auto currOf = ftell( _request->_fp );
690 if ( currOf == -1 )
691 return false;
692
693 if (fseeko(_request->_fp, blk.start, SEEK_SET))
694 return false;
695
696 zypp::Digest newDig = blk._digest->clone();
697
698 char buf[4096];
699 size_t l = blk.len;
700 while (l) {
701 size_t cnt = l > sizeof(buf) ? sizeof(buf) : l;
702 if (fread(buf, cnt, 1, _request->_fp) != 1)
703 return false;
704 newDig.update(buf, cnt);
705 l -= cnt;
706 }
707
708 if (fseeko(_request->_fp, currOf, SEEK_SET))
709 return false;
710
711 blk._digest = std::move(newDig);
712 if (!_multiByteHandler->validateRange(blk)) {
713 WAR << "#" << _workerno << " Stripe: " << _stripe << ": Stripe-Block: " << _rangeToStripeBlock[workerRangeIdx] << " failed to validate" << endl;
714 return false;
715 }
716
717 return true;
718}
719
724{
725 UByteArray sum;
726 std::optional<zypp::Digest> digest;
727 std::optional<size_t> relDigLen;
728 std::optional<size_t> blkSumPad;
729
730 const auto &blk = _request->_blklist.getBlock( blkNo );
731 if ( _request->_blklist.haveChecksum ( blkNo ) ) {
732 sum = _request->_blklist.getChecksum( blkNo );
733 relDigLen = sum.size( );
734 blkSumPad = _request->_blklist.checksumPad( );
735 digest = zypp::Digest();
736 digest->create( _request->_blklist.getChecksumType() );
737 }
738
739 return MultiByteHandler::Range::make(
740 blk.off,
741 blk.size,
742 std::move(digest),
743 std::move(sum),
744 {}, // empty user data
745 std::move(relDigLen),
746 std::move(blkSumPad) );
747}
748
750{
751 if (!_request->_stealing)
752 {
753 XXX << "start stealing!" << endl;
754 _request->_stealing = true;
755 }
756
757 multifetchworker *best = 0; // best choice for the worker we want to compete with
758 double now = 0;
759
760 // look through all currently running workers to find the best candidate we
761 // could steal from
762 for (auto workeriter = _request->_workers.begin(); workeriter != _request->_workers.end(); ++workeriter)
763 {
764 multifetchworker *worker = workeriter->get();
765 if (worker == this)
766 continue;
767 if (worker->_pass == -1)
768 continue; // do not steal!
769 if (worker->_state == WORKER_DISCARD || worker->_state == WORKER_DONE || worker->_state == WORKER_SLEEP || !worker->_datasize)
770 continue; // do not steal finished jobs
771 if (!worker->_avgspeed && worker->_datareceived)
772 {
773 // calculate avg speed for the worker if that was not done yet
774 if (!now)
775 now = currentTime();
776 if (now > worker->_starttime)
777 worker->_avgspeed = worker->_datareceived / (now - worker->_starttime);
778 }
779 // only consider worker who still have work
780 if ( worker->_datasize - worker->_datareceived <= 0 )
781 continue;
782 if (!best || best->_pass > worker->_pass)
783 {
784 best = worker;
785 continue;
786 }
787 if (best->_pass < worker->_pass)
788 continue;
789 // if it is the same stripe, our current best choice is competing with the worker we are looking at
790 // we need to check if we are faster than the fastest one competing for this stripe, so we want the best.
791 // Otherwise the worst.
792 if (worker->_stripe == best->_stripe)
793 {
794 if ((worker->_datasize - worker->_datareceived) * best->_avgspeed < (best->_datasize - best->_datareceived) * worker->_avgspeed)
795 best = worker;
796 }
797 else
798 {
799 if ((worker->_datasize - worker->_datareceived) * best->_avgspeed > (best->_datasize - best->_datareceived) * worker->_avgspeed)
800 best = worker;
801 }
802 }
803 if (!best)
804 {
807 _request->_finished = true;
808 return;
809 }
810 // do not sleep twice
811 if (_state != WORKER_SLEEP)
812 {
813 if (!_avgspeed && _datareceived)
814 {
815 if (!now)
816 now = currentTime();
817 if (now > _starttime)
819 }
820
821 // lets see if we should sleep a bit
822 XXX << "me #" << _workerno << ": " << _avgspeed << ", size " << best->_datasize << endl;
823 XXX << "best #" << best->_workerno << ": " << best->_avgspeed << ", size " << (best->_datasize - best->_datareceived) << endl;
824
825 // check if we could download the full data from best faster than best could download its remaining data
826 if ( _avgspeed && best->_avgspeed // we and best have average speed information
827 && _avgspeed <= best->_avgspeed ) // and we are not faster than best
828 {
829 if (!now)
830 now = currentTime();
831 double sl = (best->_datasize - best->_datareceived) / best->_avgspeed * 2;
832 if (sl > 1)
833 sl = 1;
834 XXX << "#" << _workerno << ": going to sleep for " << sl * 1000 << " ms" << endl;
835 _sleepuntil = now + sl;
838 return;
839 }
840 }
841
842 _competing = true;
843 best->_competing = true;
844 _stripe = best->_stripe;
845
846 best->_pass++;
847 _pass = best->_pass;
848
849 runjob();
850}
851
852void
854{
855 for ( auto workeriter = _request->_workers.begin(); workeriter != _request->_workers.end(); ++workeriter)
856 {
857 multifetchworker *worker = workeriter->get();
858 if (worker == this)
859 continue;
860 if (worker->_stripe == _stripe)
861 {
862 if (worker->_state == WORKER_FETCH)
863 worker->_state = WORKER_DISCARD;
864 worker->_pass = -1; /* do not steal this one, we already have it */
865 }
866 }
867}
868
870{
871 _datasize = 0;
872 _blocks.clear();
873
874 // claim next stripe for us, or steal if there nothing left to claim
876 stealjob();
877 return;
878 }
879
881 runjob();
882}
883
885{
886 _datasize = 0;
887 _blocks.clear ();
888 _rangeToStripeBlock.clear ();
889
890 auto &stripeDesc = _request->_requiredStripes[_stripe];
891 for ( uint i = 0; i < stripeDesc.blocks.size(); i++ ) {
892 // ignore verified and finalized ranges
893 if( stripeDesc.blockStates[i] == Stripe::FINALIZED ) {
894 continue;
895 } else {
896 _blocks.push_back( rangeFromBlock(stripeDesc.blocks[i]) );
897 _rangeToStripeBlock.push_back( i );
898 _datasize += _blocks.back().len;
899 }
900 }
901
902 if ( _datasize == 0 ) {
903 // no blocks left in this stripe
906 if ( !_request->_activeworkers )
907 _request->_finished = true;
908 return;
909 }
910
911 DBG << "#" << _workerno << "Done adding blocks to download, going to download: " << _blocks.size() << " nr of block with " << _datasize << " nr of bytes" << std::endl;
912
913 _multiByteHandler.reset( nullptr );
914 _multiByteHandler = std::make_unique<MultiByteHandler>(_protocolMode, _curl, _blocks, *this );
916 _datareceived = 0;
917 run();
918}
919
921{
922 bool hadRangeFail = _multiByteHandler->lastError() == MultiByteHandler::Code::RangeFail;
923 if ( !_multiByteHandler->prepareToContinue() ) {
924 setCurlError(_multiByteHandler->lastErrorMessage().c_str());
925 return false;
926 }
927
928 if ( hadRangeFail ) {
929 // we reset the handle to default values. We do this to not run into
930 // "transfer closed with outstanding read data remaining" error CURL sometimes returns when
931 // we cancel a connection because of a range error to request a smaller batch.
932 // The error will still happen but much less frequently than without resetting the handle.
933 //
934 // Note: Even creating a new handle will NOT fix the issue
935 curl_easy_reset( _curl );
936 if ( !setupHandle())
937 return false;
938 }
939
940 run();
941 return true;
942}
943
944void
946{
948 return; // just in case...
949
950 if ( !_multiByteHandler->prepare() ) {
953 setCurlError(_multiByteHandler->lastErrorMessage ().c_str());
954 return;
955 }
956
957 if (curl_multi_add_handle(_request->_multi, _curl) != CURLM_OK) {
960 setCurlError("curl_multi_add_handle failed");
961 return;
962 }
963
964 _request->_havenewjob = true;
966}
967
968
970
971
972multifetchrequest::multifetchrequest(const MediaMultiCurl *context, Pathname filename, Url baseurl, CURLM *multi, FILE *fp, callback::SendReport<DownloadProgressReport> *report, MediaBlockList &&blklist, off_t filesize)
973 : internal::CurlPollHelper::CurlPoll{ multi }
974 , _context(context)
975 , _filename(std::move(filename))
976 , _baseurl(std::move(baseurl))
977 , _fp(fp)
978 , _report(report)
979 , _blklist(std::move(blklist))
980 , _filesize(filesize)
981 , _starttime(currentTime())
982 , _timeout(context->_settings.timeout())
983 , _connect_timeout(context->_settings.connectTimeout())
984 , _maxspeed(context->_settings.maxDownloadSpeed())
985 , _maxworkers(context->_settings.maxConcurrentConnections())
986 {
988 if (_maxworkers > MAXURLS)
990 if (_maxworkers <= 0)
991 _maxworkers = 1;
992
993 // calculate the total size of our download
994 for (size_t blkno = 0; blkno < _blklist.numBlocks(); blkno++)
995 _totalsize += _blklist.getBlock(blkno).size;
996
997 // equally distribute the data we want to download over all workers
999
1000 // lets build stripe informations
1001 zypp::ByteCount currStripeSize = 0;
1002 for (size_t blkno = 0; blkno < _blklist.numBlocks(); blkno++) {
1003
1004 const MediaBlock &blk = _blklist.getBlock(blkno);
1005 if ( _requiredStripes.empty() || currStripeSize >= _defaultBlksize ) {
1006 _requiredStripes.push_back( Stripe{} );
1007 currStripeSize = 0;
1008 }
1009
1010 _requiredStripes.back().blocks.push_back(blkno);
1011 _requiredStripes.back().blockStates.push_back(Stripe::PENDING);
1012 currStripeSize += blk.size;
1013 }
1014
1015 MIL << "Downloading " << _blklist.numBlocks() << " blocks via " << _requiredStripes.size() << " stripes on " << _maxworkers << " connections." << endl;
1016}
1017
1022
1023void
1024multifetchrequest::run(std::vector<Url> &urllist)
1025{
1026 int workerno = 0;
1027 std::vector<Url>::iterator urliter = urllist.begin();
1028
1029 internal::CurlPollHelper _curlHelper(*this);
1030
1031 // kickstart curl
1032 CURLMcode mcode = _curlHelper.handleTimout();
1033 if (mcode != CURLM_OK)
1034 ZYPP_THROW(MediaCurlException(_baseurl, "curl_multi_socket_action", "unknown error"));
1035
1036 for (;;)
1037 {
1038 // list of all fds we want to poll
1039 std::vector<GPollFD> waitFds;
1040 int dnsFdCount = 0;
1041
1042 if (_finished)
1043 {
1044 XXX << "finished!" << endl;
1045 break;
1046 }
1047
1048 if ((int)_activeworkers < _maxworkers && urliter != urllist.end() && _workers.size() < MAXURLS)
1049 {
1050 // spawn another worker!
1051 _workers.push_back(std::make_unique<multifetchworker>(workerno++, *this, *urliter));
1052 auto &worker = _workers.back();
1053 if (worker->_state != WORKER_BROKEN)
1054 {
1056 if (worker->_state != WORKER_LOOKUP)
1057 {
1058 worker->nextjob();
1059 }
1060 else
1062 }
1063 ++urliter;
1064 continue;
1065 }
1066 if (!_activeworkers)
1067 {
1068 WAR << "No more active workers!" << endl;
1069 // show the first worker error we find
1070 for (auto workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
1071 {
1072 if ((*workeriter)->_state != WORKER_BROKEN)
1073 continue;
1074 ZYPP_THROW(MediaCurlException(_baseurl, "Server error", (*workeriter)->curlError()));
1075 }
1076 break;
1077 }
1078
1079 if (_lookupworkers)
1080 for (auto workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
1081 (*workeriter)->adddnsfd( waitFds );
1082
1083 // if we added a new job we have to call multi_perform once
1084 // to make it show up in the fd set. do not sleep in this case.
1085 // setting timeout to 0 and not -1 will make sure the poll() case further down is actually entered, do not change
1086 // without adapting the logic around poll
1087 int timeoutMs = _havenewjob ? 0 : 200;
1088 if ( _sleepworkers && !_havenewjob ) {
1089 if (_minsleepuntil == 0) {
1090 for (auto workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter) {
1091 multifetchworker *worker = workeriter->get();
1092 if (worker->_state != WORKER_SLEEP)
1093 continue;
1094 if (!_minsleepuntil || _minsleepuntil > worker->_sleepuntil)
1095 _minsleepuntil = worker->_sleepuntil;
1096 }
1097 }
1098 double sl = _minsleepuntil - currentTime();
1099 if (sl < 0) {
1100 sl = 0;
1101 _minsleepuntil = 0;
1102 }
1103 if (sl < .2)
1104 timeoutMs = sl * 1000;
1105 }
1106
1107 if ( _curlHelper.timeout_ms.has_value() )
1108 timeoutMs = std::min<long>( timeoutMs, _curlHelper.timeout_ms.value() );
1109
1110 dnsFdCount = waitFds.size(); // remember how many dns fd's we have
1111 waitFds.insert( waitFds.end(), _curlHelper.socks.begin(), _curlHelper.socks.end() ); // add the curl fd's to the poll data
1112
1113 // run poll only if we either have a valid timeout or sockets to wait for... otherwise we end up waiting forever (bsc#1230912)
1114 if ( !waitFds.empty() || timeoutMs != -1) {
1115 int r = zypp_detail::zypp_poll( waitFds, timeoutMs );
1116 if ( r == -1 )
1117 ZYPP_THROW(MediaCurlException(_baseurl, "zypp_poll() failed", "unknown error"));
1118 if ( r != 0 && _lookupworkers ) {
1119 for (auto workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
1120 {
1121 multifetchworker *worker = workeriter->get();
1122 if (worker->_state != WORKER_LOOKUP)
1123 continue;
1124 (*workeriter)->dnsevent( waitFds );
1125 if (worker->_state != WORKER_LOOKUP)
1127 }
1128 }
1129
1130 // run curl
1131 if ( r == 0 ) {
1132 CURLMcode mcode = _curlHelper.handleTimout();
1133 if (mcode != CURLM_OK)
1134 ZYPP_THROW(MediaCurlException(_baseurl, "curl_multi_socket_action", "unknown error"));
1135 } else {
1136 CURLMcode mcode = _curlHelper.handleSocketActions( waitFds, dnsFdCount );
1137 if (mcode != CURLM_OK)
1138 ZYPP_THROW(MediaCurlException(_baseurl, "curl_multi_socket_action", "unknown error"));
1139 }
1140
1141 // reset havenewjobs, we just called into curl_multi_socket_action ... no need to call another time just because
1142 // we maybe added jobs during checking the dns worker events
1143 _havenewjob = false;
1144 }
1145
1146 double now = currentTime();
1147
1148 // update periodavg
1149 if (now > _lastperiodstart + .5)
1150 {
1151 if (!_periodavg)
1153 else
1156 _lastperiodstart = now;
1157 }
1158
1159 // wake up sleepers
1160 if (_sleepworkers)
1161 {
1162 for (auto workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
1163 {
1164 multifetchworker *worker = workeriter->get();
1165 if (worker->_state != WORKER_SLEEP)
1166 continue;
1167 if (worker->_sleepuntil > now)
1168 continue;
1169 if (_minsleepuntil == worker->_sleepuntil)
1170 _minsleepuntil = 0;
1171 XXX << "#" << worker->_workerno << ": sleep done, wake up" << endl;
1172 _sleepworkers--;
1173 // nextjob changes the state
1174 worker->nextjob();
1175 }
1176 }
1177
1178 // collect all curl results, (re)schedule jobs
1179 CURLMsg *msg = nullptr;
1180 int nqueue = 0;
1181 while ((msg = curl_multi_info_read(_multi, &nqueue)) != 0)
1182 {
1183 if (msg->msg != CURLMSG_DONE)
1184 continue;
1185 CURL *easy = msg->easy_handle;
1186 CURLcode cc = msg->data.result;
1187 multifetchworker *worker = nullptr;
1188
1189 if (curl_easy_getinfo(easy, CURLINFO_PRIVATE, &worker) != CURLE_OK)
1190 ZYPP_THROW(MediaCurlException(_baseurl, "curl_easy_getinfo", "unknown error"));
1191
1192 if (worker->_datareceived && now > worker->_starttime) {
1193 if (worker->_avgspeed)
1194 worker->_avgspeed = (worker->_avgspeed + worker->_datareceived / (now - worker->_starttime)) / 2;
1195 else
1196 worker->_avgspeed = worker->_datareceived / (now - worker->_starttime);
1197 }
1198
1199 XXX << "#" << worker->_workerno << " done code " << cc << " speed " << worker->_avgspeed << endl;
1200 curl_multi_remove_handle(_multi, easy);
1201
1202 const auto &setWorkerBroken = [&]( const std::string &str = {} ){
1203 worker->_state = WORKER_BROKEN;
1204 if ( !str.empty () )
1205 worker->setCurlError(str.c_str());
1207
1208 if (!_activeworkers && !(urliter != urllist.end() && _workers.size() < MAXURLS)) {
1209 // end of workers reached! goodbye!
1210 worker->evaluateCurlCode(Pathname(), cc, false);
1211 }
1212 };
1213
1214 if ( !worker->_multiByteHandler ) {
1215 WAR << "#" << worker->_workerno << ": has no multibyte handler, this is a bug" << endl;
1216 setWorkerBroken("Multibyte handler error");
1217 continue;
1218 }
1219
1220 // tell the worker to finalize the current block
1221 worker->_multiByteHandler->finalize();
1222
1223 if ( worker->_multiByteHandler->hasMoreWork() && ( cc == CURLE_OK || worker->_multiByteHandler->canRecover() ) ) {
1224
1225 WAR << "#" << worker->_workerno << ": still has work to do or can recover from a error, continuing the job!" << endl;
1226 // the current job is not done, or we failed and need to try more, enqueue and start again
1227 if ( !worker->continueJob() ) {
1228 WAR << "#" << worker->_workerno << ": failed to continue (" << worker->_multiByteHandler->lastErrorMessage() << ")" << endl;
1229 setWorkerBroken( worker->_multiByteHandler->lastErrorMessage() );
1230 }
1231 continue;
1232 }
1233
1234 // --- from here on worker has no more ranges in its current job, or had a error it can't recover from ---
1235
1236 if ( cc != CURLE_OK ) {
1237 if ( worker->_state != WORKER_DISCARD ) {
1238 // something went wrong and we can not recover, broken worker!
1239 setWorkerBroken();
1240 continue;
1241 } else {
1242 WAR << "#" << worker->_workerno << ": failed, but was set to discard, reusing for new requests" << endl;
1243 }
1244 } else {
1245
1246 // we got what we asked for, maybe. Lets see if all ranges have been marked as finalized
1247 if( !worker->_multiByteHandler->verifyData() ) {
1248 WAR << "#" << worker->_workerno << ": error: " << worker->_multiByteHandler->lastErrorMessage() << ", disable worker" << endl;
1249 setWorkerBroken();
1250 continue;
1251 }
1252
1253 // from here on we know THIS worker only got data that verified
1254 // now lets see if the stripe was finished too
1255 // stripe blocks can now be only in FINALIZED or ERROR states
1256 if (worker->_state == WORKER_FETCH ) {
1257 if ( worker->_competing ) {
1258 worker->disableCompetition ();
1259 }
1260 auto &wrkerStripe = _requiredStripes[worker->_stripe];
1261 bool done = std::all_of( wrkerStripe.blockStates.begin(), wrkerStripe.blockStates.begin(), []( const Stripe::RState s ) { return s == Stripe::FINALIZED; } );
1262 if ( !done ) {
1263 // all ranges that are not finalized are in a bogus state, refetch them
1264 std::for_each( wrkerStripe.blockStates.begin(), wrkerStripe.blockStates.begin(), []( Stripe::RState &s ) {
1265 if ( s != Stripe::FINALIZED)
1266 s = Stripe::PENDING;
1267 });
1268
1269 _finished = false; //reset finished flag
1270 worker->runjob();
1271 continue;
1272 }
1273 }
1274
1275 // make bad workers ( bad as in really slow ) sleep a little
1276 double maxavg = 0;
1277 int maxworkerno = 0;
1278 int numbetter = 0;
1279 for (auto workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
1280 {
1281 multifetchworker *oworker = workeriter->get();
1282 if (oworker->_state == WORKER_BROKEN)
1283 continue;
1284 if (oworker->_avgspeed > maxavg)
1285 {
1286 maxavg = oworker->_avgspeed;
1287 maxworkerno = oworker->_workerno;
1288 }
1289 if (oworker->_avgspeed > worker->_avgspeed)
1290 numbetter++;
1291 }
1292 if (maxavg && !_stealing)
1293 {
1294 double ratio = worker->_avgspeed / maxavg;
1295 ratio = 1 - ratio;
1296 if (numbetter < 3) // don't sleep that much if we're in the top two
1297 ratio = ratio * ratio;
1298 if (ratio > .01)
1299 {
1300 XXX << "#" << worker->_workerno << ": too slow ("<< ratio << ", " << worker->_avgspeed << ", #" << maxworkerno << ": " << maxavg << "), going to sleep for " << ratio * 1000 << " ms" << endl;
1301 worker->_sleepuntil = now + ratio;
1302 worker->_state = WORKER_SLEEP;
1303 _sleepworkers++;
1304 continue;
1305 }
1306 }
1307
1308 // do rate control (if requested)
1309 // should use periodavg, but that's not what libcurl does
1310 if (_maxspeed && now > _starttime)
1311 {
1312 double avg = _fetchedsize / (now - _starttime);
1313 avg = worker->_maxspeed * _maxspeed / avg;
1314 if (avg < _maxspeed / _maxworkers)
1315 avg = _maxspeed / _maxworkers;
1316 if (avg > _maxspeed)
1317 avg = _maxspeed;
1318 if (avg < 1024)
1319 avg = 1024;
1320 worker->_maxspeed = avg;
1321#if CURLVERSION_AT_LEAST(7,15,5)
1322 curl_easy_setopt(worker->_curl, CURLOPT_MAX_RECV_SPEED_LARGE, (curl_off_t)(avg));
1323#endif
1324 }
1325
1326 worker->nextjob();
1327 }
1328
1329 if ( _filesize > 0 && _fetchedgoodsize > _filesize ) {
1331 }
1332 }
1333
1334 // send report
1335 if (_report)
1336 {
1337 int percent = _totalsize ? (100 * (_fetchedgoodsize + _fetchedsize)) / (_totalsize + _fetchedsize) : 0;
1338
1339 double avg = 0;
1340 if (now > _starttime)
1341 avg = _fetchedsize / (now - _starttime);
1342 if (!(*(_report))->progress(percent, _baseurl, avg, _lastperiodstart == _starttime ? avg : _periodavg))
1343 ZYPP_THROW(MediaCurlException(_baseurl, "User abort", "cancelled"));
1344 }
1345
1346 if (_timeout && now - _lastprogress > _timeout)
1347 break;
1348 }
1349
1350 if (!_finished)
1352
1353 // print some download stats
1354 WAR << "overall result" << endl;
1355 for (auto workeriter = _workers.begin(); workeriter != _workers.end(); ++workeriter)
1356 {
1357 multifetchworker *worker = workeriter->get();
1358 WAR << "#" << worker->_workerno << ": state: " << worker->_state << " received: " << worker->_received << " url: " << worker->_url << endl;
1359 }
1360}
1361
1362inline zypp::ByteCount multifetchrequest::makeBlksize ( uint maxConns, size_t filesize )
1363{
1364 // If the calculated strip size is too small and can cause a loss in TCP throughput. Raise
1365 // it to a reasonable value.
1366 return std::max<zypp::ByteCount>( filesize / std::min( std::max<int>( 1, maxConns ) , MAXURLS ), zypp::ByteCount(MIN_STRIPE_SIZE_KB, zypp::ByteCount::K) );
1367}
1368
1370
1371
1372MediaMultiCurl::MediaMultiCurl(const Url &url_r, const Pathname & attach_point_hint_r)
1373 : MediaCurl(url_r, attach_point_hint_r)
1374{
1375 MIL << "MediaMultiCurl::MediaMultiCurl(" << url_r << ", " << attach_point_hint_r << ")" << endl;
1376 _multi = 0;
1378}
1379
1381{
1383 {
1384 curl_slist_free_all(_customHeadersMetalink);
1386 }
1387 if (_multi)
1388 {
1389 curl_multi_cleanup(_multi);
1390 _multi = 0;
1391 }
1392 std::map<std::string, CURL *>::iterator it;
1393 for (it = _easypool.begin(); it != _easypool.end(); it++)
1394 {
1395 CURL *easy = it->second;
1396 if (easy)
1397 {
1398 curl_easy_cleanup(easy);
1399 it->second = NULL;
1400 }
1401 }
1402}
1403
1405{
1407
1409 {
1410 curl_slist_free_all(_customHeadersMetalink);
1412 }
1413 struct curl_slist *sl = _customHeaders;
1414 for (; sl; sl = sl->next)
1415 _customHeadersMetalink = curl_slist_append(_customHeadersMetalink, sl->data);
1416 //, application/x-zsync
1417 _customHeadersMetalink = curl_slist_append(_customHeadersMetalink, "Accept: */*, application/x-zsync, application/metalink+xml, application/metalink4+xml");
1418}
1419
1420// here we try to suppress all progress coming from a metalink download
1421// bsc#1021291: Nevertheless send alive trigger (without stats), so UIs
1422// are able to abort a hanging metalink download via callback response.
1423int MediaMultiCurl::progressCallback( void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow)
1424{
1426 if (!_curl)
1427 return MediaCurl::aliveCallback(clientp, dltotal, dlnow, ultotal, ulnow);
1428
1429 // bsc#408814: Don't report any sizes before we don't have data on disk. Data reported
1430 // due to redirection etc. are not interesting, but may disturb filesize checks.
1431 FILE *fp = 0;
1432 if ( curl_easy_getinfo( _curl, CURLINFO_PRIVATE, &fp ) != CURLE_OK || !fp )
1433 return MediaCurl::aliveCallback( clientp, dltotal, dlnow, ultotal, ulnow );
1434 if ( ftell( fp ) == 0 )
1435 return MediaCurl::aliveCallback( clientp, dltotal, 0.0, ultotal, ulnow );
1436
1437 // (no longer needed due to the filesize check above?)
1438 // work around curl bug that gives us old data
1439 long httpReturnCode = 0;
1440 if (curl_easy_getinfo(_curl, CURLINFO_RESPONSE_CODE, &httpReturnCode ) != CURLE_OK || httpReturnCode == 0)
1441 return MediaCurl::aliveCallback(clientp, dltotal, dlnow, ultotal, ulnow);
1442
1443 char *ptr = NULL;
1444 bool ismetalink = false;
1445 if (curl_easy_getinfo(_curl, CURLINFO_CONTENT_TYPE, &ptr) == CURLE_OK && ptr)
1446 {
1447 std::string ct = std::string(ptr);
1448 if (ct.find("application/x-zsync") == 0 || ct.find("application/metalink+xml") == 0 || ct.find("application/metalink4+xml") == 0)
1449 ismetalink = true;
1450 }
1451 if (!ismetalink && dlnow < 256)
1452 {
1453 // can't tell yet, ...
1454 return MediaCurl::aliveCallback(clientp, dltotal, dlnow, ultotal, ulnow);
1455 }
1456 if (!ismetalink)
1457 {
1458 fflush(fp);
1459 ismetalink = looks_like_meta_file(fp) != MetaDataType::None;
1460 DBG << "looks_like_meta_file: " << ismetalink << endl;
1461 }
1462 if (ismetalink)
1463 {
1464 // this is a metalink file change the expected filesize
1466 // we're downloading the metalink file. Just trigger aliveCallbacks
1467 curl_easy_setopt(_curl, CURLOPT_XFERINFOFUNCTION, &MediaCurl::aliveCallback);
1468 return MediaCurl::aliveCallback(clientp, dltotal, dlnow, ultotal, ulnow);
1469 }
1470 curl_easy_setopt(_curl, CURLOPT_XFERINFOFUNCTION, &MediaCurl::progressCallback);
1471 return MediaCurl::progressCallback(clientp, dltotal, dlnow, ultotal, ulnow);
1472}
1473
1474void MediaMultiCurl::doGetFileCopy( const OnMediaLocation &srcFile , const Pathname & target, callback::SendReport<DownloadProgressReport> & report, RequestOptions options ) const
1475{
1476 Pathname dest = target.absolutename();
1477 if( assert_dir( dest.dirname() ) )
1478 {
1479 DBG << "assert_dir " << dest.dirname() << " failed" << endl;
1480 ZYPP_THROW( MediaSystemException(getFileUrl(srcFile.filename()), "System error on " + dest.dirname().asString()) );
1481 }
1482
1483 ManagedFile destNew { target.extend( ".new.zypp.XXXXXX" ) };
1484 AutoFILE file;
1485 {
1486 AutoFREE<char> buf { ::strdup( (*destNew).c_str() ) };
1487 if( ! buf )
1488 {
1489 ERR << "out of memory for temp file name" << endl;
1490 ZYPP_THROW(MediaSystemException(getFileUrl(srcFile.filename()), "out of memory for temp file name"));
1491 }
1492
1493 AutoFD tmp_fd { ::mkostemp( buf, O_CLOEXEC ) };
1494 if( tmp_fd == -1 )
1495 {
1496 ERR << "mkstemp failed for file '" << destNew << "'" << endl;
1498 }
1499 destNew = ManagedFile( (*buf), filesystem::unlink );
1500
1501 file = ::fdopen( tmp_fd, "we" );
1502 if ( ! file )
1503 {
1504 ERR << "fopen failed for file '" << destNew << "'" << endl;
1506 }
1507 tmp_fd.resetDispose(); // don't close it here! ::fdopen moved ownership to file
1508 }
1509
1510 DBG << "dest: " << dest << endl;
1511 DBG << "temp: " << destNew << endl;
1512
1513 // set IFMODSINCE time condition (no download if not modified)
1514 if( PathInfo(target).isExist() && !(options & OPTION_NO_IFMODSINCE) )
1515 {
1516 curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_IFMODSINCE);
1517 curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, (long)PathInfo(target).mtime());
1518 }
1519 else
1520 {
1521 curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_NONE);
1522 curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, 0L);
1523 }
1524 // change header to include Accept: metalink
1525 curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _customHeadersMetalink);
1526 // change to our own progress funcion
1527 curl_easy_setopt(_curl, CURLOPT_XFERINFOFUNCTION, &progressCallback);
1528 curl_easy_setopt(_curl, CURLOPT_PRIVATE, (*file) ); // important to pass the FILE* explicitly (passing through varargs)
1529 try
1530 {
1531 MediaCurl::doGetFileCopyFile( srcFile, dest, file, report, options );
1532 }
1533 catch (Exception &ex)
1534 {
1535 curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_NONE);
1536 curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, 0L);
1537 curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _customHeaders);
1538 curl_easy_setopt(_curl, CURLOPT_PRIVATE, (void *)0);
1539 ZYPP_RETHROW(ex);
1540 }
1541 curl_easy_setopt(_curl, CURLOPT_TIMECONDITION, CURL_TIMECOND_NONE);
1542 curl_easy_setopt(_curl, CURLOPT_TIMEVALUE, 0L);
1543 curl_easy_setopt(_curl, CURLOPT_HTTPHEADER, _customHeaders);
1544 curl_easy_setopt(_curl, CURLOPT_PRIVATE, (void *)0);
1545 long httpReturnCode = 0;
1546 CURLcode infoRet = curl_easy_getinfo(_curl, CURLINFO_RESPONSE_CODE, &httpReturnCode);
1547 if (infoRet == CURLE_OK)
1548 {
1549 DBG << "HTTP response: " + str::numstring(httpReturnCode) << endl;
1550 if ( httpReturnCode == 304
1551 || ( httpReturnCode == 213 && _url.getScheme() == "ftp" ) ) // not modified
1552 {
1553 DBG << "not modified: " << PathInfo(dest) << endl;
1554 return;
1555 }
1556 }
1557 else
1558 {
1559 WAR << "Could not get the response code." << endl;
1560 }
1561
1562 MetaDataType ismetalink = MetaDataType::None;
1563
1564 char *ptr = NULL;
1565 if (curl_easy_getinfo(_curl, CURLINFO_CONTENT_TYPE, &ptr) == CURLE_OK && ptr)
1566 {
1567 std::string ct = std::string(ptr);
1568 if (ct.find("application/x-zsync") == 0 )
1569 ismetalink = MetaDataType::Zsync;
1570 else if (ct.find("application/metalink+xml") == 0 || ct.find("application/metalink4+xml") == 0)
1571 ismetalink = MetaDataType::MetaLink;
1572 }
1573
1574 if ( ismetalink == MetaDataType::None )
1575 {
1576 // some proxies do not store the content type, so also look at the file to find
1577 // out if we received a metalink (bnc#649925)
1578 fflush(file);
1579 ismetalink = looks_like_meta_file(destNew);
1580 }
1581
1582 if ( ismetalink != MetaDataType::None )
1583 {
1584 bool userabort = false;
1585 Pathname failedFile = ZConfig::instance().repoCachePath() / "MultiCurl.failed";
1586 file = nullptr; // explicitly close destNew before the parser reads it.
1587 try
1588 {
1589 MediaBlockList bl;
1590 std::vector<Url> urls;
1591 if ( ismetalink == MetaDataType::Zsync ) {
1592 ZsyncParser parser;
1593 parser.parse( destNew );
1594 bl = parser.getBlockList();
1595 urls = parser.getUrls();
1596
1597 XXX << getFileUrl(srcFile.filename()) << " returned zsync meta data." << std::endl;
1598 } else {
1599 MetaLinkParser mlp;
1600 mlp.parse(destNew);
1601 bl = mlp.getBlockList();
1602 urls = mlp.getUrls();
1603
1604 XXX << getFileUrl(srcFile.filename()) << " returned metalink meta data." << std::endl;
1605 }
1606
1607 if ( bl.numBlocks() )
1608 XXX << "With " << bl.numBlocks() << " nr of blocks and a blocksize of " << bl.getBlock(0).size << std::endl;
1609 else
1610 XXX << "With no blocks" << std::endl;
1611
1612 /*
1613 * gihub issue libzipp:#277 Multicurl backend breaks with MirrorCache and Metalink with unknown filesize.
1614 * Fall back to a normal download if we have no knowledge about the filesize we want to download.
1615 */
1616 if ( !bl.haveFilesize() && ! srcFile.downloadSize() ) {
1617 XXX << "No filesize in metalink file and no expected filesize, aborting multicurl." << std::endl;
1618 ZYPP_THROW( MediaException("Multicurl requires filesize but none was provided.") );
1619 }
1620
1621#if 0
1622 Disabling this workaround for now, since we now do zip ranges into bigger requests
1623 /*
1624 * bsc#1191609 In certain locations we do not receive a suitable number of metalink mirrors, and might even
1625 * download chunks serially from one and the same server. In those cases we need to fall back to a normal download.
1626 */
1627 if ( urls.size() < MIN_REQ_MIRRS ) {
1628 ZYPP_THROW( MediaException("Multicurl enabled but not enough mirrors provided") );
1629 }
1630#endif
1631
1632 // XXX << bl << endl;
1633 file = fopen((*destNew).c_str(), "w+e");
1634 if (!file)
1636 if (PathInfo(target).isExist())
1637 {
1638 XXX << "reusing blocks from file " << target << endl;
1639 bl.reuseBlocks(file, target.asString());
1640 XXX << bl << endl;
1641 }
1642 if (bl.haveChecksum(1) && PathInfo(failedFile).isExist())
1643 {
1644 XXX << "reusing blocks from file " << failedFile << endl;
1645 bl.reuseBlocks(file, failedFile.asString());
1646 XXX << bl << endl;
1647 filesystem::unlink(failedFile);
1648 }
1649 const Pathname& df = srcFile.deltafile();
1650 if (!df.empty())
1651 {
1652 XXX << "reusing blocks from file " << df << endl;
1653 bl.reuseBlocks(file, df.asString());
1654 XXX << bl << endl;
1655 }
1656 try
1657 {
1658 multifetch(srcFile.filename(), file, &urls, &report, std::move(bl), srcFile.downloadSize());
1659 }
1660 catch (MediaCurlException &ex)
1661 {
1662 userabort = ex.errstr() == "User abort";
1663 ZYPP_RETHROW(ex);
1664 }
1665 }
1666 catch (MediaFileSizeExceededException &ex) {
1667 ZYPP_RETHROW(ex);
1668 }
1669 catch (Exception &ex)
1670 {
1671 // something went wrong. fall back to normal download
1672 file = nullptr; // explicitly close destNew before moving it
1673 WAR<< "Failed to multifetch file " << ex << " falling back to single Curl download!" << std::endl;
1674 if (PathInfo(destNew).size() >= 63336)
1675 {
1676 ::unlink(failedFile.asString().c_str());
1677 filesystem::hardlinkCopy(destNew, failedFile);
1678 }
1679 if (userabort)
1680 {
1681 ZYPP_RETHROW(ex);
1682 }
1683 file = fopen((*destNew).c_str(), "w+e");
1684 if (!file)
1686
1687 // use the default progressCallback
1688 curl_easy_setopt(_curl, CURLOPT_XFERINFOFUNCTION, &MediaCurl::progressCallback);
1689 MediaCurl::doGetFileCopyFile(srcFile, dest, file, report, options | OPTION_NO_REPORT_START);
1690 }
1691 }
1692
1693 if (::fchmod( ::fileno(file), filesystem::applyUmaskTo( 0644 )))
1694 {
1695 ERR << "Failed to chmod file " << destNew << endl;
1696 }
1697
1698 file.resetDispose(); // we're going to close it manually here
1699 if (::fclose(file))
1700 {
1701 filesystem::unlink(destNew);
1702 ERR << "Fclose failed for file '" << destNew << "'" << endl;
1704 }
1705
1706 if ( rename( destNew, dest ) != 0 )
1707 {
1708 ERR << "Rename failed" << endl;
1710 }
1711 destNew.resetDispose(); // no more need to unlink it
1712
1713 DBG << "done: " << PathInfo(dest) << endl;
1714}
1715
1716void MediaMultiCurl::multifetch(const Pathname & filename, FILE *fp, std::vector<Url> *urllist, MediaBlockList &&blklist, callback::SendReport<DownloadProgressReport> *report, off_t filesize) const
1717{
1718 Url baseurl(getFileUrl(filename));
1719 if (filesize == off_t(-1) && blklist.haveFilesize())
1720 filesize = blklist.getFilesize();
1721 if (!blklist.haveBlocks() && filesize != 0) {
1722 if ( filesize == -1 ) {
1723 ZYPP_THROW(MediaException("No filesize and no blocklist, falling back to normal download."));
1724 }
1725
1726 // build a blocklist on demand just so that we have ranges
1727 MIL << "Generate blocklist, since there was none in the metalink file." << std::endl;
1728
1729 off_t currOff = 0;
1730 const auto prefSize = multifetchrequest::makeBlksize( _settings.maxConcurrentConnections(), filesize );
1731
1732 while ( currOff < filesize ) {
1733
1734 auto blksize = filesize - currOff ;
1735 if ( blksize > prefSize )
1736 blksize = prefSize;
1737
1738 blklist.addBlock( currOff, blksize );
1739 currOff += blksize;
1740 }
1741
1742 XXX << "Generated blocklist: " << std::endl << blklist << std::endl << " End blocklist " << std::endl;
1743
1744 }
1745 if (filesize == 0 || !blklist.numBlocks()) {
1746 checkFileDigest(baseurl, fp, blklist);
1747 return;
1748 }
1749 if (filesize == 0)
1750 return;
1751
1752 if (!_multi)
1753 {
1754 _multi = curl_multi_init();
1755 if (!_multi)
1757 }
1758
1759 multifetchrequest req(this, filename, baseurl, _multi, fp, report, std::move(blklist), filesize);
1760 std::vector<Url> myurllist;
1761 for (std::vector<Url>::iterator urliter = urllist->begin(); urliter != urllist->end(); ++urliter)
1762 {
1763 try
1764 {
1765 std::string scheme = urliter->getScheme();
1766 if (scheme == "http" || scheme == "https" || scheme == "ftp" || scheme == "tftp")
1767 {
1768 checkProtocol(*urliter);
1769 myurllist.push_back(internal::propagateQueryParams(*urliter, _url));
1770 }
1771 }
1772 catch (...)
1773 {
1774 }
1775 }
1776 if (!myurllist.size())
1777 myurllist.push_back(baseurl);
1778 req.run(myurllist);
1779 checkFileDigest(baseurl, fp, req.blockList() );
1780}
1781
1782void MediaMultiCurl::checkFileDigest(Url &url, FILE *fp, MediaBlockList &blklist) const
1783{
1784 if ( !blklist.haveFileChecksum() )
1785 return;
1786 if (fseeko(fp, off_t(0), SEEK_SET))
1787 ZYPP_THROW(MediaCurlException(url, "fseeko", "seek error"));
1788 Digest dig;
1789 blklist.createFileDigest(dig);
1790 char buf[4096];
1791 size_t l = 0;
1792 while ((l = fread(buf, 1, sizeof(buf), fp)) > 0)
1793 dig.update(buf, l);
1794 if (!blklist.verifyFileDigest(dig))
1795 ZYPP_THROW(MediaCurlException(url, "file verification failed", "checksum error"));
1796}
1797
1798bool MediaMultiCurl::isDNSok(const std::string &host) const
1799{
1800 return _dnsok.find(host) == _dnsok.end() ? false : true;
1801}
1802
1803void MediaMultiCurl::setDNSok(const std::string &host) const
1804{
1805 _dnsok.insert(host);
1806}
1807
1808CURL *MediaMultiCurl::fromEasyPool(const std::string &host) const
1809{
1810 if (_easypool.find(host) == _easypool.end())
1811 return 0;
1812 CURL *ret = _easypool[host];
1813 _easypool.erase(host);
1814 return ret;
1815}
1816
1817void MediaMultiCurl::toEasyPool(const std::string &host, CURL *easy) const
1818{
1819 CURL *oldeasy = _easypool[host];
1820 _easypool[host] = easy;
1821 if (oldeasy)
1822 curl_easy_cleanup(oldeasy);
1823}
1824
1825 } // namespace media
1826} // namespace zypp
struct _GPollFD GPollFD
Definition ZYppImpl.h:26
void resetDispose()
Set no dispose function.
Store and operate with byte count.
Definition ByteCount.h:32
static const Unit MB
1000^2 Byte
Definition ByteCount.h:61
static const Unit K
1024 Byte
Definition ByteCount.h:46
std::string asString(unsigned field_width_r=0, unsigned unit_width_r=1) const
Auto selected Unit and precision.
Definition ByteCount.h:134
Compute Message Digests (MD5, SHA1 etc)
Definition Digest.h:38
bool update(const char *bytes, size_t len)
feed data into digest computation algorithm
Definition Digest.cc:311
Digest clone() const
Returns a clone of the current Digest and returns it.
Definition Digest.cc:230
Base class for Exception.
Definition Exception.h:147
Describes a resource file located on a medium.
const ByteCount & downloadSize() const
The size of the resource on the server.
const Pathname & filename() const
The path to the resource on the medium.
const Pathname & deltafile() const
The existing deltafile that can be used to reduce download size ( zchunk or metalink )
Url manipulation class.
Definition Url.h:92
std::string getScheme() const
Returns the scheme name of the URL.
Definition Url.cc:537
std::string asString() const
Returns a default string representation of the Url object.
Definition Url.cc:501
std::string getHost(EEncoding eflag=zypp::url::E_DECODED) const
Returns the hostname or IP from the URL authority.
Definition Url.cc:592
Pathname repoCachePath() const
Path where the caches are kept (/var/cache/zypp)
Definition ZConfig.cc:1042
static ZConfig & instance()
Singleton ctor.
Definition ZConfig.cc:925
Wrapper class for stat/lstat.
Definition PathInfo.h:222
Pathname extend(const std::string &r) const
Append string r to the last component of the path.
Definition Pathname.h:175
Pathname dirname() const
Return all but the last component od this path.
Definition Pathname.h:126
const std::string & asString() const
String representation.
Definition Pathname.h:93
Pathname absolutename() const
Return this path, adding a leading '/' if relative.
Definition Pathname.h:141
static long auth_type_str2long(std::string &auth_type_str)
Converts a string of comma separated list of authetication type names into a long of ORed CURLAUTH_* ...
bool haveChecksum(size_t blkno) const
const MediaBlock & getBlock(size_t blkno) const
return the offset/size of a block with number blkno
UByteArray getChecksum(size_t blkno) const
void reuseBlocks(FILE *wfp, const std::string &filename)
std::string getChecksumType() const
bool createFileDigest(Digest &digest) const
size_t numBlocks() const
return the number of blocks in the blocklist
bool verifyFileDigest(Digest &digest) const
Implementation class for FTP, HTTP and HTTPS MediaHandler.
Definition MediaCurl.h:32
virtual void setupEasy()
initializes the curl easy handle with the data from the url
Definition MediaCurl.cc:426
@ OPTION_NO_IFMODSINCE
to not add a IFMODSINCE header if target exists
Definition MediaCurl.h:43
@ OPTION_NO_REPORT_START
do not send a start ProgressReport
Definition MediaCurl.h:45
static void resetExpectedFileSize(void *clientp, const ByteCount &expectedFileSize)
MediaMultiCurl needs to reset the expected filesize in case a metalink file is downloaded otherwise t...
static int progressCallback(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow)
Callback reporting download progress.
Url clearQueryString(const Url &url) const
Definition MediaCurl.cc:382
void doGetFileCopyFile(const OnMediaLocation &srcFile, const Pathname &dest, FILE *file, callback::SendReport< DownloadProgressReport > &report, RequestOptions options=OPTION_NONE) const
void checkProtocol(const Url &url) const
check the url is supported by the curl library
Definition MediaCurl.cc:401
void evaluateCurlCode(const zypp::Pathname &filename, CURLcode code, bool timeout) const
Evaluates a curl return code and throws the right MediaException filename Filename being downloaded c...
Definition MediaCurl.cc:842
void setCurlError(const char *error)
Definition MediaCurl.cc:392
static CURL * progressCallback_getcurl(void *clientp)
static int aliveCallback(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow)
Callback sending just an alive trigger to the UI, without stats (e.g.
void disconnectFrom() override
Definition MediaCurl.cc:714
curl_slist * _customHeaders
Definition MediaCurl.h:171
Just inherits Exception to separate media exceptions.
Url url() const
Url used.
const Url _url
Url to handle.
void setupEasy() override
initializes the curl easy handle with the data from the url
std::map< std::string, CURL * > _easypool
void checkFileDigest(Url &url, FILE *fp, MediaBlockList &blklist) const
void setDNSok(const std::string &host) const
MediaMultiCurl(const Url &url_r, const Pathname &attach_point_hint_r)
std::set< std::string > _dnsok
bool isDNSok(const std::string &host) const
void multifetch(const Pathname &filename, FILE *fp, std::vector< Url > *urllist, MediaBlockList &&blklist, callback::SendReport< DownloadProgressReport > *report=0, off_t filesize=off_t(-1)) const
static int progressCallback(void *clientp, curl_off_t dltotal, curl_off_t dlnow, curl_off_t ultotal, curl_off_t ulnow)
CURL * fromEasyPool(const std::string &host) const
void doGetFileCopy(const OnMediaLocation &srcFile, const Pathname &targetFilename, callback::SendReport< DownloadProgressReport > &_report, RequestOptions options=OPTION_NONE) const override
void toEasyPool(const std::string &host, CURL *easy) const
Url getFileUrl(const Pathname &filename) const
concatenate the attach url and the filename to a complete download url
void parse(const Pathname &filename)
parse a file consisting of metalink xml data
MediaBlockList getBlockList() const
return the block list from the parsed metalink data
std::vector< Url > getUrls() const
return the download urls from the parsed metalink data
const std::string & password() const
auth password
const std::string & authType() const
get the allowed authentication types
void setUsername(const std::string &val_r)
sets the auth username
std::string userPassword() const
returns the user and password as a user:pass string
const std::string & proxy() const
proxy host
long maxConcurrentConnections() const
Maximum number of concurrent connections for a single transfer.
void setPassword(const std::string &val_r)
sets the auth password
const std::string & username() const
auth username
void setAuthType(const std::string &val_r)
set the allowed authentication types
void parse(const Pathname &filename)
parse a file consisting of zlink data
MediaBlockList getBlockList()
return the block list from the parsed metalink data
std::vector< Url > getUrls()
return the download urls from the parsed metalink data
multifetchrequest(multifetchrequest &&)=delete
multifetchrequest(const MediaMultiCurl *context, Pathname filename, Url baseurl, CURLM *multi, FILE *fp, callback::SendReport< DownloadProgressReport > *report, MediaBlockList &&blklist, off_t filesize)
multifetchrequest & operator=(multifetchrequest &&)=delete
std::vector< Stripe > _requiredStripes
callback::SendReport< DownloadProgressReport > * _report
void run(std::vector< Url > &urllist)
multifetchrequest(const multifetchrequest &)=delete
std::list< std::unique_ptr< multifetchworker > > _workers
static ByteCount makeBlksize(uint maxConns, size_t filesize)
multifetchrequest & operator=(const multifetchrequest &)=delete
const MediaMultiCurl * _context
bool beginRange(off_t range, std::string &cancelReason) override
multifetchworker(int no, multifetchrequest &request, const Url &url)
size_t writefunction(char *ptr, std::optional< off_t > offset, size_t bytes) override
size_t headerfunction(char *ptr, size_t bytes) override
MultiByteHandler::Range rangeFromBlock(off_t blockNo) const
multifetchworker(multifetchworker &&)=delete
std::vector< MultiByteHandler::Range > _blocks
void adddnsfd(std::vector< GPollFD > &waitFds)
MultiByteHandler::ProtocolMode _protocolMode
std::unique_ptr< MultiByteHandler > _multiByteHandler
bool recheckChecksum(off_t blockIdx)
void dnsevent(const std::vector< GPollFD > &waitFds)
std::vector< off_t > _rangeToStripeBlock
bool finishedRange(off_t range, bool validated, std::string &cancelReason) override
multifetchworker & operator=(const multifetchworker &)=delete
MultiFetchWorkerState _state
multifetchworker & operator=(multifetchworker &&)=delete
multifetchworker(const multifetchworker &)=delete
The CurlMultiPartHandler class.
zypp::Url propagateQueryParams(zypp::Url url_r, const zypp::Url &template_r)
Definition Arch.h:364
String related utilities and Regular expression matching.
mode_t applyUmaskTo(mode_t mode_r)
Modify mode_r according to the current umask ( mode_r & ~getUmask() ).
Definition PathInfo.h:798
int hardlinkCopy(const Pathname &oldpath, const Pathname &newpath)
Create newpath as hardlink or copy of oldpath.
Definition PathInfo.cc:888
int unlink(const Pathname &path)
Like 'unlink'.
Definition PathInfo.cc:705
static bool env_isset(const std::string &name)
constexpr auto MAXURLS
static double currentTime()
constexpr auto MIN_REQ_MIRRS
MetaDataType looks_like_meta_file(const Pathname &file)
constexpr auto MIN_STRIPE_SIZE_KB
std::string numstring(char n, int w=0)
Definition String.h:289
int zypp_poll(std::vector< GPollFD > &fds, int timeout)
Small wrapper around g_poll that additionally listens to the shutdown FD returned by ZYpp::shutdownSi...
Definition ZYppImpl.cc:313
Easy-to use interface to the ZYPP dependency resolver.
AutoDispose< const Pathname > ManagedFile
A Pathname plus associated cleanup code to be executed when path is no longer needed.
Definition ManagedFile.h:27
auto eintrSafeCall(Fun &&function, Args &&... args)
CURLMcode handleSocketActions(const std::vector< GPollFD > &actionsFds, int first=0)
std::vector< GPollFD > socks
std::optional< long > timeout_ms
AutoDispose<int> calling ::close
AutoDispose<FILE*> calling ::fclose
a single block from the blocklist, consisting of an offset and a size
std::vector< off_t > blocks
std::vector< RState > blockStates
#define ZYPP_RETHROW(EXCPT)
Drops a logline and rethrows, updating the CodeLocation.
Definition Exception.h:444
#define ZYPP_THROW(EXCPT)
Drops a logline and throws the Exception.
Definition Exception.h:424
#define DBG
Definition Logger.h:99
#define MIL
Definition Logger.h:100
#define ERR
Definition Logger.h:102
#define WAR
Definition Logger.h:101
#define XXX
Definition Logger.h:98