IT++ 4.3.1
tcp.cpp
Go to the documentation of this file.
1
31
32#include <itpp/protocol/tcp.h>
33#include <itpp/base/itfile.h>
34#include <limits>
35#include <cstdlib>
36#include <ctime>
37
39
40#ifdef _MSC_VER
41#pragma warning(disable:4355)
42#endif
43
44namespace itpp
45{
46
47// -------------------- Default parameters ----------------------------------
48
49// TCP sender and receiver
50
51#define TCP_HEADERLENGTH 40
52
53// TCP sender
54
55#define TCP_VERSION kReno
56#define TCP_SMSS 1460
57#define TCP_INITIALCWNDREL 2 // related to MSS
58#define TCP_INITIALSSTHRESHREL 1 // related to MaxCWnd
59#define TCP_MAXCWNDREL 32 // related to MSS
60#define TCP_DUPACKS 3
61#define TCP_INITIALRTT 1
62const double TCP_STIMERGRAN = 0.2;
63const double TCP_SWSATIMERVALUE = 0.2;
64#define TCP_MAXBACKOFF 64
65const double TCP_MAXRTO = std::numeric_limits<double>::max();
66#define TCP_IMMEDIATEBACKOFFRESET false
67#define TCP_TIMESTAMPS false
68#define TCP_KARN true
69#define TCP_NAGLE false
70#define TCP_GOBACKN true
71#define TCP_FLIGHTSIZERECOVERY false
72#define TCP_RENOCONSERVATION true
73#define TCP_CAREFULSSTHRESHREDUCTION true
74#define TCP_IGNOREDUPACKONTORECOVERY true
75#define TCP_CAREFULMULFASTRTXAVOIDANCE true
76#define TCP_RESTARTAFTERIDLE true
77
78// TCP receiver
79
80#define TCP_RMSS 1460
81const int TCP_BUFFERSIZE = std::numeric_limits<int>::max() / 4;
82#define TCP_DELAYEDACK true
83const double TCP_ACKDELAYTIME = 0.2;
84#define TCP_SENDPERIODICACKS false
85#define TCP_STRICTPERIODICACKS false
86#define TCP_PERIODICACKINTERVAL 1
87#define TCP_ACKSCHEDULINGDELAY 0
88#define TCP_ACKBUFFERWRITE false
89#define TCP_ACKBUFFERREAD true
90const int TCP_MAXUSERBLOCKSIZE = std::numeric_limits<int>::max() / 4;
91#define TCP_MINUSERBLOCKSIZE 1
92#define TCP_USERBLOCKPROCDELAY 0
93
94// TCP generator
95
96#define TCPGEN_BLOCKSIZE 1460
97
98// TCP applications
99
100#define TCPAPP_MAXNOOFACTIVEAPPS 500
101#define TCPAPP_DISTSTATARRAYSIZE 100
102#define TCPAPP_DISTSTATMAXGOODPUT 1000
103#define TCPAPP_DISTSTATMAXTRANSFERTIME 10000
104#define TCPAPP_CONDMEANSTATARRAYSIZE 100
105#define TCPAPP_CONDMEANSTATMAXREQSIZE 100000
106
107
108
109inline int min(int opd1, int opd2)
110{
111 return (opd1 < opd2) ? opd1 : opd2;
112}
113
114
115inline int max(int opd1, int opd2)
116{
117 return (opd1 > opd2) ? opd1 : opd2;
118}
119
120
121// round is used to map a double value (e.g. RTO in TTCPSender) to the
122// next higher value of a certain granularity (e.g. timer granularity).
123inline double round(const double value, const double granularity)
124{
125 return (std::ceil(value / granularity) * granularity);
126}
127
128// -------------------- TCP_Segment ----------------------------------------
129
131 seq_begin(),
132 seq_end()
133{
134}
135
136TCP_Segment::TCP_Segment(const Sequence_Number &sn_begin, const Sequence_Number &sn_end) :
137 seq_begin(sn_begin),
138 seq_end(sn_end)
139{
140 it_assert(seq_begin <= seq_end, "TCP_Segment::TCP_Segment, end byte " + to_str(seq_end.value()) +
141 " < begin byte " + to_str(seq_begin.value()));
142}
143
144
145TCP_Segment::TCP_Segment(const TCP_Segment &segment) :
146 seq_begin(segment.seq_begin),
147 seq_end(segment.seq_end)
148{
149}
150
151
152TCP_Segment &TCP_Segment::operator=(const TCP_Segment &segment)
153{
154 this->seq_begin = segment.seq_begin;
155 this->seq_end = segment.seq_end;
156
157 return *this;
158}
159
160
161void TCP_Segment::combine(const TCP_Segment &segment)
162{
163 it_assert(can_be_combined(segment), "TCP_Segment::CombineWith, segments cannot be combined");
164
165 seq_begin = min(seq_begin, segment.seq_begin);
166 seq_end = max(seq_end, segment.seq_end);
167}
168
169
170std::ostream & operator<<(std::ostream &os, const TCP_Segment &segment)
171{
172 os << "(" << segment.seq_begin << "," << segment.seq_end << ")";
173 return os;
174}
175
176
177// -------------------- TCP_Packet ----------------------------------------
178TCP_Packet::TCP_Packet() :
179 fSegment(),
180 fACK(),
181 fWnd(0),
182 fSessionId(0),
183 fInfo(0)
184{
185}
186
187
188TCP_Packet::TCP_Packet(const TCP_Packet &packet) :
189 fSegment(packet.fSegment),
190 fACK(packet.fACK),
191 fWnd(packet.fWnd),
192 fSessionId(packet.fSessionId),
193 fInfo(0)
194{
195 std::cout << "TCP_Packet::TCP_Packet ############" << " ";
196
197 if (packet.fInfo != 0) {
198 std::cout << "TCP_Packet::TCP_Packet rhs.fInfo ###########" << " ";
199 fInfo = new TDebugInfo(*packet.fInfo);
200 }
201}
202
203
204TCP_Packet::~TCP_Packet()
205{
206 delete fInfo;
207}
208
209
210TCP_Packet & TCP_Packet::clone() const
211{
212 return *new TCP_Packet(*this);
213}
214
215
216void TCP_Packet::set_info(unsigned ssThresh, unsigned recWnd, unsigned cWnd,
217 double estRTT, Sequence_Number sndUna,
218 Sequence_Number sndNxt, bool isRtx)
219{
220 if (fInfo == 0) {
221 fInfo = new TDebugInfo;
222 }
223
224 fInfo->fSSThresh = ssThresh;
225 fInfo->fRecWnd = recWnd;
226 fInfo->fCWnd = cWnd;
227 fInfo->fRTTEstimate = estRTT;
228 fInfo->fSndUna = sndUna;
229 fInfo->fSndNxt = sndNxt;
230 fInfo->fRtxFlag = isRtx;
231}
232
233
234void TCP_Packet::print_header(std::ostream &) const
235{
236 std::cout << "Hello!\n";
237
238 std::cout << "Ses = " << get_session_id() << " ";
239
240 std::cout << "Segment = " << get_segment() << " "
241 << "ACK = " << get_ACK() << " "
242 << "Wnd = " << get_wnd() << " ";
243
244 std::cout << "DestPort = " << fDestinationPort << " "
245 << "SourcePort = " << fSourcePort << " ";
246
247
248 if (fInfo != 0) {
249 std::cout << "SndSSThresh = " << fInfo->fSSThresh << " ";
250 std::cout << "RecWnd = " << fInfo->fRecWnd << " ";
251 std::cout << "SndCWnd = " << fInfo->fCWnd << " ";
252 std::cout << "RTTEstimate = " << fInfo->fRTTEstimate << " ";
253 std::cout << "RtxFlag = " << fInfo->fRtxFlag;
254 }
255 else
256 std::cout << "fInfo = " << fInfo << " ";
257
258 std::cout << std::endl;
259
260}
261
262
263
264std::ostream & operator<<(std::ostream & out, TCP_Packet & msg)
265{
266 msg.print_header(out);
267 return out;
268}
269
270
271// -------------------- TCP_Sender ----------------------------------------
272TCP_Sender::TCP_Sender(int label) :
273 fLabel(label),
274 fTCPVersion(TCP_VERSION),
275 fMSS(TCP_SMSS),
276 fTCPIPHeaderLength(TCP_HEADERLENGTH),
277 fInitialRTT(TCP_INITIALRTT),
278 fInitialCWnd(0), // default initialization see below
279 fInitialSSThresh(0), // default initialization see below
280 fMaxCWnd(0), // default initialization see below
281 fDupACKThreshold(TCP_DUPACKS),
282 fTimerGranularity(TCP_STIMERGRAN),
283 fMaxRTO(TCP_MAXRTO),
284 fMaxBackoff(TCP_MAXBACKOFF),
285 fImmediateBackoffReset(TCP_IMMEDIATEBACKOFFRESET),
286 fKarn(TCP_KARN),
287 fGoBackN(TCP_GOBACKN),
288 fFlightSizeRecovery(TCP_FLIGHTSIZERECOVERY),
289 fRenoConservation(TCP_RENOCONSERVATION),
290 fCarefulSSThreshReduction(TCP_CAREFULSSTHRESHREDUCTION),
291 fIgnoreDupACKOnTORecovery(TCP_IGNOREDUPACKONTORECOVERY),
292 fCarefulMulFastRtxAvoidance(TCP_CAREFULMULFASTRTXAVOIDANCE),
293 fNagle(TCP_NAGLE),
294 fSWSATimerValue(TCP_SWSATIMERVALUE),
295 fRestartAfterIdle(TCP_RESTARTAFTERIDLE),
296 fDebug(false),
297 fTrace(false),
298 fSessionId(0),
299 fRtxTimer(*this, &TCP_Sender::HandleRtxTimeout),
300 fSWSATimer(*this, &TCP_Sender::HandleSWSATimeout)/*,*/
301{
302
303 // default values and parameter check for MaxCWND, InitCWND, InitSSThresh
304 if (fMaxCWnd == 0) {
305 fMaxCWnd = (unsigned)(TCP_MAXCWNDREL * fMSS);
306 }
307 else if (fMaxCWnd < fMSS) {
308 // throw (UL_CException("TCP_Sender::TCP_Sender",
309 // "MaxCWnd must be >= MSS"));
310 }
311
312 if (fInitialCWnd == 0) {
313 fInitialCWnd = (unsigned)(TCP_INITIALCWNDREL * fMSS);
314 }
315 else if ((fInitialCWnd < fMSS) || (fInitialCWnd > fMaxCWnd)) {
316 // throw (UL_CException("TCP_Sender::TCP_Sender",
317 // "initial CWnd must be >= MSS and <= MaxCWnd"));
318 }
319
320 if ((fInitialSSThresh == 0) && (fMaxCWnd >= 2 * fMSS)) {
321 fInitialSSThresh = (unsigned)(TCP_INITIALSSTHRESHREL * fMaxCWnd);
322 }
323 else if ((fInitialSSThresh < 2*fMSS) || (fInitialCWnd > fMaxCWnd)) {
324 // throw (UL_CException("TCP_Sender::TCP_Sender",
325 // "initial CWnd must be >= 2*MSS and <= MaxCWnd"));
326 }
327
328 setup();
329
330 InitStatistics();
331
332
333 tcp_send.set_name("TCP Send");
334 tcp_receive_ack.forward(this, &TCP_Sender::ReceiveMessageFromNet);
335 tcp_receive_ack.set_name("TCP ACK");
336 tcp_socket_write.forward(this, &TCP_Sender::HandleUserMessageIndication);
337 tcp_socket_write.set_name("SocketWrite");
338 tcp_release.forward(this, &TCP_Sender::release);
339 tcp_release.set_name("Release");
340
341}
342
343
344TCP_Sender::~TCP_Sender()
345{
346}
347
348void TCP_Sender::set_debug(const bool enable_debug)
349{
350 fDebug = enable_debug;
351 tcp_send.set_debug(enable_debug);
352}
353
354void TCP_Sender::set_debug(bool enable_debug, bool enable_signal_debug)
355{
356 fDebug = enable_debug;
357 tcp_send.set_debug(enable_signal_debug);
358}
359
360void TCP_Sender::set_trace(const bool enable_trace)
361{
362 fTrace = enable_trace;
363}
364
365void TCP_Sender::set_label(int label)
366{
367 fLabel = label;
368}
369
370void TCP_Sender::setup()
371{
372 fSndUna = 0;
373 fSndNxt = 0;
374 fSndMax = 0;
375 fMaxRecWnd = 0;
376 fRecWnd = fMaxCWnd;
377 fUserNxt = 0;
378 fCWnd = fInitialCWnd;
379 fSSThresh = fInitialSSThresh;
380 fRecoveryDupACK = 0;
381 fRecoveryTO = 0;
382 fDupACKCnt = 0;
383
384 // timers
385 fBackoff = 1;
386 fPendingBackoffReset = false;
387 fLastSendTime = Event_Queue::now();
388
389 // RTT measurement
390 fTimUna = 0;
391 fSRTT = 0;
392 fRTTVar = 0;
393 fRTTEstimate = fInitialRTT;
394 fRTTMPending = false;
395 fRTTMByte = 0;
396
397 CWnd_val.set_size(1000);
398 CWnd_val.zeros();
399 CWnd_time.set_size(1000);
400 CWnd_time.zeros();
401 CWnd_val(0) = fInitialCWnd;
402 CWnd_time(0) = 0;
403 CWnd_index = 1;
404
405 SSThresh_val.set_size(1000);
406 SSThresh_val.zeros();
407 SSThresh_time.set_size(1000);
408 SSThresh_time.zeros();
409 SSThresh_val(0) = fInitialSSThresh;
410 SSThresh_time(0) = 0;
411 SSThresh_index = 1;
412
413 sent_seq_num_val.set_size(1000);
414 sent_seq_num_val.zeros();
415 sent_seq_num_time.set_size(1000);
416 sent_seq_num_time.zeros();
417 sent_seq_num_val(0) = 0;
418 sent_seq_num_time(0) = 0;
419 sent_seq_num_index = 1;
420
421 sender_recv_ack_seq_num_val.set_size(1000);
422 sender_recv_ack_seq_num_val.zeros();
423 sender_recv_ack_seq_num_time.set_size(1000);
424 sender_recv_ack_seq_num_time.zeros();
425 sender_recv_ack_seq_num_val(0) = 0;
426 sender_recv_ack_seq_num_time(0) = 0;
427 sender_recv_ack_seq_num_index = 1;
428
429 RTTEstimate_val.set_size(1000);
430 RTTEstimate_val.zeros();
431 RTTEstimate_time.set_size(1000);
432 RTTEstimate_time.zeros();
433 RTTEstimate_val(0) = fInitialRTT;
434 RTTEstimate_time(0) = 0;
435 RTTEstimate_index = 1;
436
437 RTTsample_val.set_size(1000);
438 RTTsample_val.zeros();
439 RTTsample_time.set_size(1000);
440 RTTsample_time.zeros();
441 RTTsample_val(0) = 0;
442 RTTsample_time(0) = 0;
443 RTTsample_index = 1;
444
445}
446
447std::string TCP_Sender::GenerateFilename()
448{
449 time_t rawtime;
450#ifndef _MSC_VER
451 struct tm *timeinfo;
452 timeinfo = localtime(&rawtime);
453#else
454 time(&rawtime);
455 struct tm _timeinfo;
456 struct tm *timeinfo = &_timeinfo;
457 localtime_s(timeinfo, &rawtime);
458#endif
459 std::ostringstream filename_stream;
460 filename_stream << "trace_tcp_sender_u" << fLabel
461 << "_" << 1900 + timeinfo->tm_year
462 << "_" << timeinfo->tm_mon
463 << "_" << timeinfo->tm_mday
464 << "__" << timeinfo->tm_hour
465 << "_" << timeinfo->tm_min
466 << "_" << timeinfo->tm_sec
467 << "_.it";
468 return filename_stream.str();
469}
470
471
472void TCP_Sender::release(std::string file)
473{
474 std::string filename;
475 fSessionId++;
476
477 fRtxTimer.Reset();
478 fSWSATimer.Reset();
479
480 if (fTrace) {
481 if (file == "")
482 filename = GenerateFilename();
483 else
484 filename = file;
485
486 save_trace(filename);
487 }
488}
489
490
491void TCP_Sender::InitStatistics()
492{
493 fNumberOfTimeouts = 0;
494 fNumberOfIdleTimeouts = 0;
495 fNumberOfFastRetransmits = 0;
496 fNumberOfRTTMeasurements = 0;
497 fNumberOfReceivedACKs = 0;
498}
499
500
501void TCP_Sender::StopTransientPhase()
502{
503 InitStatistics();
504}
505
506
507void TCP_Sender::HandleUserMessageIndication(itpp::Packet *user_data_p)
508{
509 if (fDebug) {
510 std::cout << "TCP_Sender::HandleUserMessageIndication"
511 << " byte_size=" << user_data_p->bit_size() / 8
512 << " ptr=" << user_data_p
513 << " time=" << Event_Queue::now() << std::endl;
514 }
515
516 SocketWriteQueue.push(user_data_p);
517
518 SendNewData(); // will call GetMessage (via GetNextSegmentSize)
519 // if new data can be sent
520}
521
522
523void TCP_Sender::ReceiveMessageFromNet(itpp::Packet *msg)
524{
525 TCP_Packet & packet = (TCP_Packet &) * msg;
526
527 if (fDebug) {
528 std::cout << "TCP_Sender::ReceiveMessageFromNet"
529 << " byte_size=" << msg->bit_size() / 8
530 << " ptr=" << msg
531 << " time=" << Event_Queue::now() << std::endl;
532 }
533
534 if ((packet.get_session_id() == fSessionId) && // ACK of current session
535 (packet.get_ACK() >= fSndUna)) { // ACK is OK
536 HandleACK(packet);
537 }
538
539 delete &packet;
540}
541
542
543void TCP_Sender::HandleACK(TCP_Packet &msg)
544{
545 it_assert(msg.get_ACK() <= fSndMax, "TCP_Sender::HandleACK, received ACK > SndMax at ");
546
547 fNumberOfReceivedACKs++;
548
549 if (fTrace) {
550 TraceACKedSeqNo(msg.get_ACK());
551 }
552
553 if (fDebug) {
554 std::cout << "sender " << fLabel << ": "
555 << "receive ACK: "
556 << " t = " << Event_Queue::now() << ", "
557 << msg << std::endl;
558 }
559
560 // update receiver advertised window size
561 fRecWnd = msg.get_wnd();
562 fMaxRecWnd = max(fRecWnd, fMaxRecWnd);
563
564 if (msg.get_ACK() == fSndUna) { // duplicate ACK
565
566 bool ignoreDupACK = (fSndMax == fSndUna); // no outstanding data
567
568 if (fIgnoreDupACKOnTORecovery) {
569 // don't count dupacks during TO recovery!
570 if (fCarefulMulFastRtxAvoidance) { // see RFC 2582, Section 5
571 // like in Solaris
572 ignoreDupACK = ignoreDupACK || (fSndUna <= fRecoveryTO);
573 }
574 else {
575 // like in ns
576 ignoreDupACK = ignoreDupACK || (fSndUna < fRecoveryTO);
577 }
578 }
579
580 if (!ignoreDupACK) {
581 fDupACKCnt++; // count the number of duplicate ACKs
582
583 if (fDupACKCnt == fDupACKThreshold) {
584 // dupack threshold is reached
585 fNumberOfFastRetransmits++;
586
587 fRecoveryDupACK = fSndMax;
588
589 ReduceSSThresh(); // halve ssthresh (in most cases)
590
591 if ((fTCPVersion == kReno) || (fTCPVersion == kNewReno)) {
592 fCWnd = fSSThresh;
593 }
594 else if (fTCPVersion == kTahoe) {
595 fCWnd = fMSS;
596 }
597
598 if (fTCPVersion == kReno || fTCPVersion == kNewReno) {
599 // conservation of packets:
600 if (fRenoConservation) {
601 fCWnd += fDupACKThreshold * fMSS;
602 }
603 }
604 else if (fTCPVersion == kTahoe) {
605 if (fGoBackN) {
606 fSndNxt = fSndUna; // Go-Back-N (like in ns)
607 }
608 }
609
610 UnaRetransmit(); // initiate retransmission
611 }
612 else if (fDupACKCnt > fDupACKThreshold) {
613 if (fTCPVersion == kReno || fTCPVersion == kNewReno) {
614 // conservation of packets
615 // CWnd may exceed MaxCWnd during fast recovery,
616 // however, the result of SendWindow() is always <= MaxCwnd
617 if (fRenoConservation) {
618 fCWnd += fMSS;
619 }
620 }
621 }
622 }
623 }
624 else { // new ACK
625 Sequence_Number oldSndUna = fSndUna; // required for NewReno partial ACK
626 fSndUna = msg.get_ACK();
627 fSndNxt = max(fSndNxt, fSndUna); // required in case of "Go-Back-N"
628
629 // reset retransmission timer
630
631 if ((fSndUna > fTimUna) && fRtxTimer.IsPending()) {
632 // seq. no. for which rtx timer is running has been received
633 fRtxTimer.Reset();
634 }
635
636 // backoff reset
637
638 if (fImmediateBackoffReset) {
639 fBackoff = 1;
640 }
641 else {
642 if (fPendingBackoffReset) {
643 fBackoff = 1;
644 fPendingBackoffReset = false;
645 }
646 else if (fBackoff > 1) {
647 // reset backoff counter only on next new ACK (this is probably
648 // the way to operate intended by Karn)
649 fPendingBackoffReset = true;
650 }
651 }
652
653 // RTT measurement
654
655 if ((fSndUna > fRTTMByte) && fRTTMPending) {
656 UpdateRTTVariables(Event_Queue::now() - fRTTMStartTime);
657 fRTTMPending = false;
658 }
659
660 // update CWnd and reset dupack counter
661
662 if (fDupACKCnt >= fDupACKThreshold) {
663 // we are in fast recovery
664 if (fTCPVersion == kNewReno && fSndUna < fRecoveryDupACK) {
665 // New Reno partial ACK handling
666 if (fRenoConservation) {
667 fCWnd = max(fMSS, fCWnd - (fSndUna - oldSndUna) + fMSS);
668 }
669 UnaRetransmit(); // start retransmit immediately
670 }
671 else {
672 FinishFastRecovery();
673 }
674 }
675 else {
676 // no fast recovery
677 fDupACKCnt = 0;
678 if (fCWnd < fSSThresh) {
679 // slow start phase
680 fCWnd = min(fCWnd + fMSS, fMaxCWnd);
681 }
682 else {
683 // congestion avoidance phase
684 fCWnd += max(fMSS * fMSS / fCWnd, 1); // RFC 2581
685 fCWnd = min(fCWnd, fMaxCWnd);
686 }
687 }
688 } // new ACK
689
690 SendNewData(); // try to send new data (even in the case that a retransmit
691 // had to be performed)
692
693 if (fTrace) {
694 TraceCWnd();
695 }
696}
697
698
699void TCP_Sender::SendNewData(bool skipSWSA)
700{
701 unsigned nextSegmentSize;
702
703 it_assert(fSndUna <= fSndNxt, "TCP_Sender::SendNewData, SndUna > SndNxt in sender " + to_str(fLabel) + "!");
704
705 if (fRestartAfterIdle) {
706 IdleCheck();
707 }
708
709 bool sillyWindowAvoidanceFailed = false;
710
711 while (!sillyWindowAvoidanceFailed &&
712 ((nextSegmentSize = GetNextSegmentSize(fSndNxt)) > 0)) {
713 // there is new data to send and window is large enough
714
715 // SWSA and Nagle (RFC 1122): assume PUSH to be set
716 unsigned queuedUnsent = fUserNxt - fSndNxt;
717 unsigned usableWindow = max(0, (fSndUna + SendWindow()) - fSndNxt);
718
719 if (((unsigned)min(queuedUnsent, usableWindow) >= fMSS) ||
720 ((!fNagle || (fSndUna == fSndNxt)) &&
721 ((queuedUnsent <= usableWindow) || // Silly W. A.
722 ((unsigned)min(queuedUnsent, usableWindow) >= fMaxRecWnd / 2)
723 )
724 ) ||
725 skipSWSA
726 ) {
727 // Silly Window Syndrome Avoidance (SWSA) and Nagle passed
728
729 TCP_Segment nextSegment(fSndNxt, fSndNxt + nextSegmentSize);
730 TCP_Packet & msg = * new TCP_Packet();
731
732 msg.set_segment(nextSegment);
733 msg.set_session_id(fSessionId);
734 msg.set_destination_port(fLabel); // The dest and src port are set to the same
735 msg.set_source_port(fLabel); // number for simplicity.
736 msg.set_bit_size(8 * (nextSegmentSize + fTCPIPHeaderLength));
737
738 if (fDebug) {
739 std::cout << "TCP_Sender::SendNewData,"
740 << " nextSegmentSize=" << nextSegmentSize
741 << " fTCPIPHeaderLength=" << fTCPIPHeaderLength
742 << " byte_size=" << msg.bit_size() / 8
743 << " ptr=" << &msg
744 << " time=" << Event_Queue::now() << std::endl;
745 }
746
747 // no RTT measurement for retransmitted segments
748 // changes on Dec. 13. 2002 (Ga, Bo, Scharf)
749
750 if (!fRTTMPending && fSndNxt >= fSndMax) { // ##Bo##
751 fRTTMStartTime = Event_Queue::now();
752 fRTTMByte = nextSegment.begin();
753 fRTTMPending = true;
754 }
755
756 fSndNxt += nextSegmentSize;
757 fSndMax = max(fSndNxt, fSndMax);
758
759 // reset SWSA timer if necessary
760 if (skipSWSA) {
761 skipSWSA = false;
762 }
763 else if (fSWSATimer.IsPending()) {
764 fSWSATimer.Reset();
765 }
766
767 // set rtx timer if necessary
768 if (!fRtxTimer.IsPending()) {
769 SetRtxTimer();
770 }
771
772
773 if (fDebug) {
774 msg.set_info(fSSThresh, fRecWnd, fCWnd, fRTTEstimate,
775 fSndUna, fSndNxt, false);
776 std::cout << "sender " << fLabel
777 << ": send new data: "
778 << " t = " << Event_Queue::now() << ", "
779 << msg << std::endl;
780 }
781
782 SendMsg(msg);
783 }
784 else {
785 sillyWindowAvoidanceFailed = true;
786 // set SWSA timer
787 if (!fSWSATimer.IsPending()) {
788 fSWSATimer.Set(fSWSATimerValue);
789 }
790 }
791 }
792
793 // set timers in case that no new data could have been sent
794 if (!fRtxTimer.IsPending()) {
795 if (fSndMax > fSndUna) { // there is outstanding data
796 if (!fImmediateBackoffReset && fPendingBackoffReset) {
797 // backoff is reset if no new data could have been sent since last
798 // (successfull) retransmission; this is useful in case of
799 // Reno recovery and multiple losses to avoid that in
800 // the (unavoidable) series of timeouts the timer value
801 // increases exponentially as this is not the intention
802 // of the delayed backoff reset in Karn's algorithm
803 fBackoff = 1;
804 fPendingBackoffReset = false;
805 }
806 SetRtxTimer();
807 }
808 }
809}
810
811
812void TCP_Sender::UnaRetransmit()
813{
814 // resend after timeout or fast retransmit
815 unsigned nextSegmentSize = GetNextSegmentSize(fSndUna);
816
817 if (nextSegmentSize > 0) {
818 TCP_Segment nextSegment(fSndUna, fSndUna + nextSegmentSize);
819 TCP_Packet & msg = *new TCP_Packet();
820 msg.set_segment(nextSegment);
821 msg.set_session_id(fSessionId);
822 msg.set_destination_port(fLabel); // The dest and src port are set to the same
823 msg.set_source_port(fLabel); // number for simplicity.
824 msg.set_bit_size(8 * (nextSegmentSize + fTCPIPHeaderLength));
825
826 fSndNxt = max(fSndNxt, fSndUna + nextSegmentSize);
827 fSndMax = max(fSndNxt, fSndMax);
828
829 // The RTT measurement is cancelled if the RTTM byte has a sequence
830 // number higher or equal than the first retransmitted byte as
831 // the ACK for the RTTM byte will be delayed by the rtx for at least
832 // one round
833 if (fKarn && (nextSegment.begin() <= fRTTMByte) && fRTTMPending) {
834 fRTTMPending = false;
835 }
836
837 SetRtxTimer();
838
839 if (fDebug) {
840 msg.set_info(fSSThresh, fRecWnd, fCWnd, fRTTEstimate,
841 fSndUna, fSndNxt, true);
842 std::cout << "sender " << fLabel;
843 if (fDupACKCnt >= fDupACKThreshold) {
844 std::cout << ": fast rtx: ";
845 }
846 else {
847 std::cout << ": TO rtx: ";
848 }
849 std::cout << " t = " << Event_Queue::now() << ", "
850 << msg << std::endl;
851 }
852
853 SendMsg(msg);
854 }
855 else {
856 // throw(UL_CException("TCP_Sender::UnaRetransmit", "no bytes to send"));
857 }
858}
859
860
861void TCP_Sender::FinishFastRecovery()
862{
863 if (fTCPVersion == kTahoe) {
864 fDupACKCnt = 0;
865 }
866 else if (fTCPVersion == kReno) {
867 // Reno fast recovery
868 fDupACKCnt = 0;
869 if (fFlightSizeRecovery) {
870 fCWnd = min(fSndMax - fSndUna + fMSS, fSSThresh);
871 }
872 else {
873 fCWnd = fSSThresh;
874 }
875 }
876 else if (fTCPVersion == kNewReno) {
877 // New Reno fast recovery
878 // "Set CWnd to ... min (ssthresh, FlightSize + MSS)
879 // ... or ssthresh" (RFC 2582)
880 if (fFlightSizeRecovery) {
881 fCWnd = min(fSndMax - fSndUna + fMSS, fSSThresh);
882 }
883 else {
884 fCWnd = fSSThresh;
885 }
886 fDupACKCnt = 0;
887 }
888}
889
890
891void TCP_Sender::ReduceSSThresh()
892{
893 if (fCarefulSSThreshReduction) {
894 // If Reno conservation is enabled the amount of
895 // outstanding data ("flight size") might be rather large
896 // and even larger than twice the old ssthresh value;
897 // so this corresponds more to the ns behaviour where always cwnd is
898 // taken instead of flight size.
899 fSSThresh = max(2 * fMSS,
900 min(min(fCWnd, fSndMax - fSndUna), fRecWnd) / 2);
901 }
902 else {
903 // use filght size / 2 as recommended in RFC 2581
904 fSSThresh = max(2 * fMSS, min(fSndMax - fSndUna, fRecWnd) / 2);
905 }
906
907 it_assert(fSSThresh <= fMaxCWnd, "TCP_Sender::HandleACK, internal error: SndSSThresh is > MaxCWnd");
908
909 if (fTrace) {
910 TraceSSThresh();
911 }
912}
913
914
915void TCP_Sender::SendMsg(TCP_Packet &msg)
916{
917 if (fTrace) {
918 TraceSentSeqNo(msg.get_segment().end());
919 }
920
921 if (fRestartAfterIdle) {
922 fLastSendTime = Event_Queue::now(); // needed for idle detection
923 }
924
925 tcp_send(&msg);
926}
927
928
929void TCP_Sender::IdleCheck()
930{
931 // idle detection according to Jacobson, SIGCOMM, 1988:
932 // sender is currently idle and nothing has been send since RTO
933
934 if (fSndMax == fSndUna && Event_Queue::now() - fLastSendTime > CalcRTOValue()) {
935 fCWnd = fInitialCWnd; // see RFC2581
936
937 fNumberOfIdleTimeouts++;
938
939 if (fTrace) {
940 TraceCWnd();
941 }
942
943 if (fDebug) {
944 std::cout << "sender " << fLabel
945 << ": idle timeout: "
946 << "t = " << Event_Queue::now()
947 << ", SndNxt = " << fSndNxt
948 << ", SndUna = " << fSndUna
949 << ", Backoff = " << fBackoff
950 << std::endl;
951 }
952 }
953}
954
955
956void TCP_Sender::HandleRtxTimeout(Ttype)
957{
958 fNumberOfTimeouts++;
959
960 // update backoff
961 fBackoff = min(fMaxBackoff, fBackoff * 2);
962 if (!fImmediateBackoffReset) {
963 fPendingBackoffReset = false;
964 }
965
966 if (fDupACKCnt >= fDupACKThreshold) {
967 FinishFastRecovery(); // reset dup ACK cnt and CWnd
968 }
969 else if (fDupACKCnt > 0) {
970 fDupACKCnt = 0; // don't allow dupack action during TO recovery
971 }
972
973 // update CWnd and SSThresh
974 ReduceSSThresh(); // halve ssthresh (in most cases)
975 fCWnd = fMSS; // not initial CWnd, see RFC 2581
976
977 it_assert(fSSThresh <= fMaxCWnd, "TCP_Sender::HandleRtxTimeout, internal error: SndSSThresh is > MaxCWnd");
978
979 fRecoveryTO = fSndMax;
980
981 if (fGoBackN) {
982 // go back N is mainly relevant in the case of multiple losses
983 // which would lead to a series of timeouts without resetting sndnxt
984 fSndNxt = fSndUna;
985 }
986
987 if (fDebug) {
988 std::cout << "sender " << fLabel
989 << ": rtx timeout: "
990 << "t = " << Event_Queue::now()
991 << ", SndNxt = " << fSndNxt
992 << ", SndUna = " << fSndUna
993 << std::endl;
994 }
995
996 if (fTrace) {
997 TraceCWnd();
998 }
999
1000 UnaRetransmit(); // initiate retransmission
1001}
1002
1003
1004void TCP_Sender::HandleSWSATimeout(Ttype)
1005{
1006 SendNewData(true);
1007}
1008
1009
1010unsigned TCP_Sender::GetNextSegmentSize(const Sequence_Number & begin)
1011{
1012 // try to get new user messages if available and necessary
1013 while ((fUserNxt < begin + fMSS) && (!SocketWriteQueue.empty())) {
1014 itpp::Packet *packet_p = SocketWriteQueue.front();
1015 SocketWriteQueue.pop();
1016 fUserNxt += (unsigned) packet_p->bit_size() / 8;
1017 delete packet_p;
1018 }
1019
1020 Sequence_Number end = min(min(fUserNxt, begin + fMSS),
1021 fSndUna + SendWindow());
1022
1023 if (fDebug) {
1024 std::cout << "TCP_Sender::GetNextSegmentSize,"
1025 << " fUserNxt=" << fUserNxt
1026 << " begin_seq_num=" << begin
1027 << " fMSS=" << fMSS
1028 << " fSndUna=" << fSndUna
1029 << " SendWindow()=" << SendWindow()
1030 << " end_seq_num=" << end
1031 << " time=" << Event_Queue::now() << std::endl;
1032 }
1033
1034 return max(0, end - begin);
1035}
1036
1037
1038unsigned TCP_Sender::SendWindow() const
1039{
1040 return min(fRecWnd, min(fMaxCWnd, fCWnd));
1041}
1042
1043
1044double TCP_Sender::CalcRTOValue() const
1045{
1046 static const double factor = 1 + 1e-8;
1047 // to avoid "simultaneous" TO/receive ACK events in case of const. RTT
1048
1049 double rto = fBackoff * fRTTEstimate * factor;
1050
1051 if (rto > fMaxRTO) {
1052 rto = fMaxRTO;
1053 }
1054
1055 return rto;
1056}
1057
1058
1059void TCP_Sender::SetRtxTimer()
1060{
1061 double rto = CalcRTOValue();
1062 fRtxTimer.Set(rto);
1063 fTimUna = fSndUna;
1064 if (fDebug) {
1065 std::cout << "sender " << fLabel
1066 << ": set rtx timer: "
1067 << "t = " << Event_Queue::now()
1068 << ", RTO = " << rto
1069 << ", Backoff = " << fBackoff
1070 << ", TimUna = " << fTimUna
1071 << std::endl;
1072 }
1073}
1074
1075
1076void TCP_Sender::UpdateRTTVariables(double sampleRTT)
1077{
1078 if (fSRTT == 0) {
1079 fSRTT = sampleRTT;
1080 fRTTVar = sampleRTT / 2;
1081 }
1082 else {
1083 // see, e.g., Comer for the values used as weights
1084 fSRTT = 0.875 * fSRTT + 0.125 * sampleRTT;
1085 fRTTVar = 0.75 * fRTTVar + 0.25 * fabs(sampleRTT - fSRTT);
1086 }
1087
1088 fRTTEstimate = round(fSRTT + 4 * fRTTVar, fTimerGranularity);
1089
1090 if (fTrace) {
1091 TraceRTTVariables(sampleRTT);
1092 }
1093
1094 fNumberOfRTTMeasurements++;
1095}
1096
1097
1098void TCP_Sender::TraceRTTVariables(double sampleRTT)
1099{
1100 if (fDebug) {
1101 std::cout << "sender " << fLabel
1102 << ": RTT update: "
1103 << "t = " << Event_Queue::now()
1104 << ", sample = " << sampleRTT
1105 << ", SRTT = " << fSRTT
1106 << ", RTTVar = " << fRTTVar
1107 << ", RTTEstimate = " << fRTTEstimate
1108 << std::endl;
1109 }
1110
1111 if (RTTsample_index >= RTTsample_time.size()) {
1112 RTTsample_time.set_size(2*RTTsample_time.size(), true);
1113 RTTsample_val.set_size(2*RTTsample_val.size(), true);
1114 }
1115 RTTsample_val(RTTsample_index) = sampleRTT;
1116 RTTsample_time(RTTsample_index) = Event_Queue::now();
1117 RTTsample_index++;
1118
1119 if (RTTEstimate_index >= RTTEstimate_time.size()) {
1120 RTTEstimate_time.set_size(2*RTTEstimate_time.size(), true);
1121 RTTEstimate_val.set_size(2*RTTEstimate_val.size(), true);
1122 }
1123 RTTEstimate_val(RTTEstimate_index) = fRTTEstimate;
1124 RTTEstimate_time(RTTEstimate_index) = Event_Queue::now();
1125 RTTEstimate_index++;
1126}
1127
1128
1129void TCP_Sender::TraceCWnd()
1130{
1131 if (fDebug) {
1132 std::cout << "sender " << fLabel
1133 << " t = " << Event_Queue::now()
1134 << " cwnd = " << fCWnd << std::endl;
1135 }
1136 if (CWnd_index >= CWnd_time.size()) {
1137 CWnd_time.set_size(2*CWnd_time.size(), true);
1138 CWnd_val.set_size(2*CWnd_val.size(), true);
1139 }
1140 CWnd_val(CWnd_index) = fCWnd;
1141 CWnd_time(CWnd_index) = Event_Queue::now();
1142 CWnd_index++;
1143
1144}
1145
1146void TCP_Sender::TraceSSThresh()
1147{
1148 if (fDebug) {
1149 std::cout << "sender " << fLabel
1150 << " t = " << Event_Queue::now()
1151 << " cwnd = " << fSSThresh << std::endl;
1152 }
1153 if (SSThresh_index >= SSThresh_time.size()) {
1154 SSThresh_time.set_size(2*SSThresh_time.size(), true);
1155 SSThresh_val.set_size(2*SSThresh_val.size(), true);
1156 }
1157 SSThresh_val(SSThresh_index) = fSSThresh;
1158 SSThresh_time(SSThresh_index) = Event_Queue::now();
1159 SSThresh_index++;
1160
1161}
1162
1163void TCP_Sender::TraceSentSeqNo(const Sequence_Number sn)
1164{
1166 if (fDebug) {
1167 std::cout << "sender " << fLabel
1168 << " t = " << Event_Queue::now()
1169 << " sent = " << sn
1170 << std::endl;
1171 }
1172 if (sent_seq_num_index >= sent_seq_num_time.size()) {
1173 sent_seq_num_time.set_size(2*sent_seq_num_time.size(), true);
1174 sent_seq_num_val.set_size(2*sent_seq_num_val.size(), true);
1175 }
1176 sent_seq_num_val(sent_seq_num_index) = sn.value();
1177 sent_seq_num_time(sent_seq_num_index) = Event_Queue::now();
1178 sent_seq_num_index++;
1179}
1180
1181
1182void TCP_Sender::TraceACKedSeqNo(const Sequence_Number sn)
1183{
1184 if (fDebug) {
1185 std::cout << "sender " << fLabel
1186 << " t = " << Event_Queue::now()
1187 << " ACK = " << sn
1188 << std::endl;
1189 }
1190
1191 if (sender_recv_ack_seq_num_index >= sender_recv_ack_seq_num_time.size()) {
1192 sender_recv_ack_seq_num_time.set_size(2*sender_recv_ack_seq_num_time.size(), true);
1193 sender_recv_ack_seq_num_val.set_size(2*sender_recv_ack_seq_num_val.size(), true);
1194 }
1195 sender_recv_ack_seq_num_val(sender_recv_ack_seq_num_index) = sn.value();
1196 sender_recv_ack_seq_num_time(sender_recv_ack_seq_num_index) = Event_Queue::now();
1197 sender_recv_ack_seq_num_index++;
1198}
1199
1200
1201void TCP_Sender::save_trace(std::string filename)
1202{
1203
1204 CWnd_val.set_size(CWnd_index, true);
1205 CWnd_time.set_size(CWnd_index, true);
1206
1207 SSThresh_val.set_size(SSThresh_index, true);
1208 SSThresh_time.set_size(SSThresh_index, true);
1209
1210 sent_seq_num_val.set_size(sent_seq_num_index, true);
1211 sent_seq_num_time.set_size(sent_seq_num_index, true);
1212
1213 sender_recv_ack_seq_num_val.set_size(sender_recv_ack_seq_num_index, true);
1214 sender_recv_ack_seq_num_time.set_size(sender_recv_ack_seq_num_index, true);
1215
1216 RTTEstimate_val.set_size(RTTEstimate_index, true);
1217 RTTEstimate_time.set_size(RTTEstimate_index, true);
1218
1219 RTTsample_val.set_size(RTTsample_index, true);
1220 RTTsample_time.set_size(RTTsample_index, true);
1221
1222 if (fDebug) {
1223 std::cout << "CWnd_val" << CWnd_val << std::endl;
1224 std::cout << "CWnd_time" << CWnd_time << std::endl;
1225 std::cout << "CWnd_index" << CWnd_index << std::endl;
1226
1227 std::cout << "SSThresh_val" << SSThresh_val << std::endl;
1228 std::cout << "SSThresh_time" << SSThresh_time << std::endl;
1229 std::cout << "SSThresh_index" << SSThresh_index << std::endl;
1230
1231 std::cout << "sent_seq_num_val" << sent_seq_num_val << std::endl;
1232 std::cout << "sent_seq_num_time" << sent_seq_num_time << std::endl;
1233 std::cout << "sent_seq_num_index" << sent_seq_num_index << std::endl;
1234
1235 std::cout << "sender_recv_ack_seq_num_val" << sender_recv_ack_seq_num_val << std::endl;
1236 std::cout << "sender_recv_ack_seq_num_time" << sender_recv_ack_seq_num_time << std::endl;
1237 std::cout << "sender_recv_ack_seq_num_index" << sender_recv_ack_seq_num_index << std::endl;
1238
1239 std::cout << "RTTEstimate_val" << RTTEstimate_val << std::endl;
1240 std::cout << "RTTEstimate_time" << RTTEstimate_time << std::endl;
1241 std::cout << "RTTEstimate_index" << RTTEstimate_index << std::endl;
1242
1243 std::cout << "RTTsample_val" << RTTsample_val << std::endl;
1244 std::cout << "RTTsample_time" << RTTsample_time << std::endl;
1245 std::cout << "RTTsample_index" << RTTsample_index << std::endl;
1246
1247 std::cout << "TCP_Sender::saving to file: " << filename << std::endl;
1248 }
1249
1250 it_file ff2;
1251 ff2.open(filename);
1252
1253 ff2 << Name("CWnd_val") << CWnd_val;
1254 ff2 << Name("CWnd_time") << CWnd_time;
1255 ff2 << Name("CWnd_index") << CWnd_index;
1256
1257 ff2 << Name("SSThresh_val") << SSThresh_val;
1258 ff2 << Name("SSThresh_time") << SSThresh_time;
1259 ff2 << Name("SSThresh_index") << SSThresh_index;
1260
1261 ff2 << Name("sent_seq_num_val") << sent_seq_num_val;
1262 ff2 << Name("sent_seq_num_time") << sent_seq_num_time;
1263 ff2 << Name("sent_seq_num_index") << sent_seq_num_index;
1264
1265 ff2 << Name("sender_recv_ack_seq_num_val") << sender_recv_ack_seq_num_val;
1266 ff2 << Name("sender_recv_ack_seq_num_time") << sender_recv_ack_seq_num_time;
1267 ff2 << Name("sender_recv_ack_seq_num_index") << sender_recv_ack_seq_num_index;
1268
1269 ff2 << Name("RTTEstimate_val") << RTTEstimate_val;
1270 ff2 << Name("RTTEstimate_time") << RTTEstimate_time;
1271 ff2 << Name("RTTEstimate_index") << RTTEstimate_index;
1272
1273 ff2 << Name("RTTsample_val") << RTTsample_val;
1274 ff2 << Name("RTTsample_time") << RTTsample_time;
1275 ff2 << Name("RTTsample_index") << RTTsample_index;
1276
1277 ff2.flush();
1278 ff2.close();
1279}
1280
1281
1282void TCP_Sender::print_item(std::ostream &, const std::string & keyword)
1283{
1284 if (keyword == "Label") {
1285 std::cout << fLabel;
1286 }
1287 else if (keyword == "CWnd") {
1288 std::cout << fCWnd;
1289 }
1290 else if (keyword == "SSThresh") {
1291 std::cout << fSSThresh;
1292 }
1293 else if (keyword == "SRTT") {
1294 std::cout << fSRTT;
1295 }
1296 else if (keyword == "RTTvar") {
1297 std::cout << fRTTVar;
1298 }
1299 else if (keyword == "Backoff") {
1300 std::cout << fBackoff;
1301 }
1302 else if (keyword == "RTO") {
1303 std::cout << CalcRTOValue();
1304 }
1305 else if (keyword == "NoOfFastRets") {
1306 std::cout << fNumberOfFastRetransmits;
1307 }
1308 else if (keyword == "NoOfRetTOs") {
1309 std::cout << fNumberOfTimeouts;
1310 }
1311 else if (keyword == "NoOfIdleTOs") {
1312 std::cout << fNumberOfIdleTimeouts;
1313 }
1314 else if (keyword == "NoOfRTTMs") {
1315 std::cout << fNumberOfRTTMeasurements;
1316 }
1317 else if (keyword == "NoOfRecACKs") {
1318 std::cout << fNumberOfReceivedACKs;
1319 }
1320 else {
1321 }
1322}
1323
1324
1325// -------------------- TCP_Receiver_Buffer ----------------------------------------
1326TCP_Receiver_Buffer::TCP_Receiver_Buffer() :
1327 fFirstByte()
1328{
1329}
1330
1331
1332TCP_Receiver_Buffer::TCP_Receiver_Buffer(const TCP_Receiver_Buffer & rhs) :
1333 fFirstByte(rhs.fFirstByte),
1334 fBufList(rhs.fBufList)
1335{
1336}
1337
1338
1339void TCP_Receiver_Buffer::reset()
1340{
1341 fBufList.clear();
1342 fFirstByte = 0;
1343}
1344
1345
1346TCP_Receiver_Buffer::~TCP_Receiver_Buffer()
1347{
1348}
1349
1350
1351void TCP_Receiver_Buffer::write(TCP_Segment newBlock)
1352{
1353 // error cases
1354 it_assert(newBlock.begin() <= newBlock.end(), "TCP_Receiver_Buffer::Write, no valid segment");
1355
1356 // cut blocks beginning before fFirstByte
1357 if (newBlock.begin() < fFirstByte) {
1358 if (newBlock.end() > fFirstByte) {
1359 newBlock.set_begin(fFirstByte);
1360 }
1361 else {
1362 return; //// TODO: Is this strange?
1363 }
1364 }
1365
1366 if (newBlock.length() == 0) { // empty block, nothing to do
1367 return;
1368 }
1369
1370 if (fBufList.empty() || (newBlock.begin() > fBufList.back().end())) {
1371 // new block is behind last block in buffer
1372 fBufList.push_back(newBlock);
1373 }
1374 else {
1375 // skip list entries if beginning of newBlock > end of current one
1376 // (search for correct list position)
1377 std::list<TCP_Segment>::iterator iter;
1378 iter = fBufList.begin();
1379 while (newBlock.begin() > iter->end()) {
1380 iter++;
1381 it_assert(iter != fBufList.end(), "TCP_Receiver_Buffer::Write, internal error");
1382 }
1383
1384 TCP_Segment & exBlock = *iter;
1385
1386 if (exBlock.can_be_combined(newBlock)) {
1387 // overlapping or contiguous blocks -> combine
1388 exBlock.combine(newBlock);
1389
1390 // check following blocks
1391 iter++;
1392 while ((iter != fBufList.end()) &&
1393 exBlock.can_be_combined(*iter)) {
1394 exBlock.combine(*iter);
1395 iter = fBufList.erase(iter);
1396 }
1397 }
1398 else {
1399 // no overlap, newBlock lies between two existing list entries
1400 // new list entry has to be created
1401
1402 fBufList.insert(iter, newBlock);
1403 }
1404 }
1405
1406 it_assert(!fBufList.empty() && fBufList.front().begin() >= fFirstByte, "TCP_Receiver_Buffer::Write, internal error");
1407
1408}
1409
1410
1411// The amount of data read from the buffer is given as parameter. It has
1412// to be less than or equal to the size of the first block stored. This
1413// mean the caller of Read should first check how much data is available
1414// by calling FirstBlockSize.
1415void TCP_Receiver_Buffer::read(unsigned noOfBytes)
1416{
1417 it_assert(first_block_size() > 0, "TCP_Receiver_Buffer::Read, No block to read");
1418 it_assert(noOfBytes <= first_block_size(), "TCP_Receiver_Buffer::Read, submitted block size not valid");
1419
1420
1421 if (noOfBytes < first_block_size()) {
1422 fBufList.front().set_begin(fBufList.front().begin() + noOfBytes);
1423 }
1424 else { // first block will be read completely
1425 fBufList.pop_front();
1426 }
1427 fFirstByte += noOfBytes;
1428
1429 it_assert(fBufList.empty() || fBufList.front().begin() >= fFirstByte, "TCP_Receiver_Buffer::Read, internal error");
1430}
1431
1432
1433// FirstBlockSize returns the size of the first block stored in the
1434// buffer or 0 if the buffer is empty
1435unsigned TCP_Receiver_Buffer::first_block_size() const
1436{
1437 if (!fBufList.empty() && (fBufList.front().begin() == fFirstByte)) {
1438 return fBufList.front().length();
1439 }
1440 else {
1441 return 0;
1442 }
1443}
1444
1445
1446std::ostream & TCP_Receiver_Buffer::info(std::ostream &os, int detail) const
1447{
1448 os << "receiver buffer information" << std::endl
1449 << "number of blocks: " << fBufList.size() << std::endl
1450 << "first byte stored: " << fFirstByte << std::endl
1451 << "last byte stored +1: " << last_byte() << std::endl
1452 << "next byte expected: " << next_expected() << std::endl;
1453
1454 if (detail > 0) {
1455 os << "segments in receiver buffer:" << std::endl;
1456
1457 typedef std::list<TCP_Segment>::const_iterator LI;
1458 for (LI i = fBufList.begin(); i != fBufList.end(); ++i) {
1459 const TCP_Segment & block = *i;
1460 os << ". segment: " << block << std::endl;
1461 }
1462
1463 }
1464
1465 return os;
1466}
1467
1468
1469// -------------------- TCP_Receiver ----------------------------------------
1470TCP_Receiver::TCP_Receiver(int label) :
1471 fReceiverBuffer(),
1472 fLabel(label),
1473 fTCPIPHeaderLength(TCP_HEADERLENGTH),
1474 fMSS(TCP_RMSS),
1475 fBufferSize(TCP_BUFFERSIZE),
1476 fDelayedACK(TCP_DELAYEDACK),
1477 fACKDelayTime(TCP_ACKDELAYTIME),
1478 fSendPeriodicACKs(TCP_SENDPERIODICACKS),
1479 fStrictPeriodicACKs(TCP_STRICTPERIODICACKS),
1480 fPeriodicACKInterval(TCP_PERIODICACKINTERVAL),
1481 fACKSchedulingDelay(TCP_ACKSCHEDULINGDELAY),
1482 fACKOnBufferWrite(TCP_ACKBUFFERWRITE),
1483 fACKOnBufferRead(TCP_ACKBUFFERREAD),
1484 fMaxUserBlockSize(TCP_MAXUSERBLOCKSIZE),
1485 fMinUserBlockSize(TCP_MINUSERBLOCKSIZE),
1486 fUserBlockProcDelay(TCP_USERBLOCKPROCDELAY),
1487 fTrace(false),
1488 fDebug(false),
1489 fSessionId(0),
1490 fDelayedACKTimer(*this, &TCP_Receiver::DelayedACKHandler),
1491 fPeriodicACKTimer(*this, &TCP_Receiver::PeriodicACKHandler),
1492 fACKSchedulingTimer(*this, &TCP_Receiver::SendACKMessage),
1493 fWaitingACKMsg(0),
1494 fUserBlockProcTimer(*this, &TCP_Receiver::HandleEndOfProcessing)
1495{
1496 fUserMessage = NULL;
1497
1498
1499 if (!fACKOnBufferRead && !fACKOnBufferWrite) {
1500 // throw(UL_CException("TCP_Receiver::TCP_Receiver",
1501 // "ACKs must be sent on buffer read or write or both"));
1502 }
1503
1504 setup();
1505
1506 tcp_receive.forward(this, &TCP_Receiver::ReceiveMessageFromNet);
1507 tcp_receive.set_name("TCP Receive");
1508 tcp_send_ack.set_name("TCP send ACK");
1509 tcp_new_data.set_name("TCP New Data");
1510 tcp_release.forward(this, &TCP_Receiver::release);
1511 tcp_release.set_name("TCP Release");
1512
1513}
1514
1515
1516TCP_Receiver::~TCP_Receiver()
1517{
1518 delete fWaitingACKMsg;
1519 delete fUserMessage;
1520}
1521
1522
1523void TCP_Receiver::set_debug(const bool enable_debug)
1524{
1525 fDebug = enable_debug;
1526 tcp_send_ack.set_debug(enable_debug);
1527 tcp_new_data.set_debug();
1528}
1529
1530void TCP_Receiver::set_debug(bool enable_debug, bool enable_signal_debug)
1531{
1532 fDebug = enable_debug;
1533 tcp_send_ack.set_debug(enable_signal_debug);
1534 tcp_new_data.set_debug();
1535}
1536
1537void TCP_Receiver::set_trace(const bool enable_trace)
1538{
1539 fTrace = enable_trace;
1540}
1541
1542
1543
1544void TCP_Receiver::setup()
1545{
1546 fAdvRcvWnd = 0;
1547 fAdvRcvNxt = 0;
1548
1549 if (fSendPeriodicACKs) {
1550 fPeriodicACKTimer.Set(fPeriodicACKInterval);
1551 }
1552
1553 fReceiverBuffer.reset();
1554
1555 received_seq_num_val.set_size(1000);
1556 received_seq_num_val.zeros();
1557 received_seq_num_time.set_size(1000);
1558 received_seq_num_time.zeros();
1559 received_seq_num_val(0) = 0;
1560 received_seq_num_time(0) = 0;
1561 received_seq_num_index = 1;
1562}
1563
1564std::string TCP_Receiver::GenerateFilename()
1565{
1566 time_t rawtime;
1567#ifndef _MSC_VER
1568 struct tm *timeinfo;
1569 timeinfo = localtime(&rawtime);
1570#else
1571 time(&rawtime);
1572 struct tm _timeinfo;
1573 struct tm *timeinfo = &_timeinfo;
1574 localtime_s(timeinfo, &rawtime);
1575#endif
1576 std::ostringstream filename_stream;
1577 filename_stream << "trace_tcp_receiver_u" << fLabel
1578 << "_" << 1900 + timeinfo->tm_year
1579 << "_" << timeinfo->tm_mon
1580 << "_" << timeinfo->tm_mday
1581 << "__" << timeinfo->tm_hour
1582 << "_" << timeinfo->tm_min
1583 << "_" << timeinfo->tm_sec
1584 << "_.it";
1585 return filename_stream.str();
1586}
1587
1588void TCP_Receiver::release(std::string file)
1589{
1590 std::string filename;
1591 fSessionId++;
1592
1593 if (fWaitingACKMsg != 0) {
1594 delete fWaitingACKMsg;
1595 fWaitingACKMsg = 0;
1596 }
1597 if (fUserMessage != 0) {
1598 delete fUserMessage;
1599 fUserMessage = 0;
1600 }
1601
1602 fUserBlockProcTimer.Reset();
1603 fDelayedACKTimer.Reset();
1604 fPeriodicACKTimer.Reset();
1605 fACKSchedulingTimer.Reset();
1606
1607 if (fTrace) {
1608 if (file == "")
1609 filename = GenerateFilename();
1610 else
1611 filename = file;
1612
1613 save_trace(filename);
1614 }
1615}
1616
1617
1618void TCP_Receiver::ReceiveMessageFromNet(itpp::Packet *msg)
1619{
1620 TCP_Packet & packet = (TCP_Packet &) * msg;
1621 if (packet.get_destination_port() == fLabel) {
1622 if (packet.get_session_id() == fSessionId) {
1623 ReceiveDataPacket(packet);
1624 }
1625 else {
1626 it_warning("Received a TCP packet with wrong SessionId");
1627 std::cout << "TCP_Receiver::ReceiveMessageFromNet, "
1628 << "fLabel= " << fLabel
1629 << "fSessionId= " << fSessionId << std::endl;
1630 std::cout << "packet=" << packet
1631 << ", next exp. = " << fReceiverBuffer.next_expected()
1632 << std::endl;
1633 exit(0);
1634 }
1635 }
1636 else {
1637 it_warning("Received a TCP packet with label");
1638 exit(0);
1639 }
1640}
1641
1642
1643void TCP_Receiver::ReceiveDataPacket(TCP_Packet &msg)
1644{
1645 TCP_Segment segment = msg.get_segment();
1646
1647 bool isOutOfOrder = (segment.begin() > fReceiverBuffer.next_expected()) ||
1648 (segment.end() <= fReceiverBuffer.next_expected());
1649
1650 if (fDebug) {
1651 std::cout << "TCP_Receiver::ReceiveDataPacket receiver: " << fLabel << ": "
1652 << "receive msg: "
1653 << "t = " << Event_Queue::now()
1654 << ", next exp. = " << fReceiverBuffer.next_expected()
1655 << ", " << msg << std::endl;
1656 }
1657
1658 if (fTrace) {
1659 TraceReceivedSeqNo(segment.end());
1660 }
1661
1662 it_assert(segment.end() <= fReceiverBuffer.first_byte() + fBufferSize, "TCP_Receiver::ReceiveTCPPacket, packet exceeds window at ");
1663 it_assert(segment.begin() < segment.end(), "TCP_Receiver::ReceiveTCPPacket, silly packet received at ");
1664
1665 fReceiverBuffer.write(segment);
1666
1667 if (isOutOfOrder) {
1668 SendACK(true); // create dupack conditionless
1669 }
1670 else {
1671 if (fACKOnBufferWrite) {
1672 SendACK(false);
1673 }
1674 IndicateUserMessage();
1675 }
1676
1677 delete &msg;
1678}
1679
1680
1681void TCP_Receiver::IndicateUserMessage()
1682{
1683 if (fUserMessage == 0) {
1684 // receive a block
1685 unsigned noOfBytes = min(fReceiverBuffer.first_block_size(),
1686 fMaxUserBlockSize);
1687
1688 if (fDebug) {
1689 std::cout << "TCP_Receiver::IndicateUserMessage "
1690 << "t = " << Event_Queue::now()
1691 << " noOfBytes = " << noOfBytes
1692 << " firstBlock = " << fReceiverBuffer.first_block_size()
1693 << std::endl;
1694 }
1695
1696 if (noOfBytes >= fMinUserBlockSize) {
1697 fUserMessage = new Packet();
1698 fUserMessage->set_bit_size(8*noOfBytes);
1699 fUserBlockProcTimer.Set(fUserBlockProcDelay);
1700 }
1701 }
1702}
1703
1704
1705bool TCP_Receiver::is_user_message_available()
1706{
1707 if (fUserMessage != 0) {
1708 return true;
1709 }
1710
1711 unsigned noOfBytes = min(fReceiverBuffer.first_block_size(),
1712 fMaxUserBlockSize);
1713
1714 if (noOfBytes >= fMinUserBlockSize) {
1715 fUserMessage = new Packet();
1716 fUserMessage->set_bit_size(8*noOfBytes);
1717 return true;
1718 }
1719 else {
1720 return false;
1721 }
1722}
1723
1724
1725itpp::Packet & TCP_Receiver::get_user_message()
1726{
1727 it_assert(fUserMessage != 0, "TCP_Receiver::GetUserMessage, no message available");
1728 if (fDebug) {
1729 std::cout << "TCP_Receiver::GetUserMessage "
1730 << "receiver: " << fLabel << ": "
1731 << "read from buffer: "
1732 << "t = " << Event_Queue::now()
1733 << ", user msg length = " << (fUserMessage->bit_size() / 8)
1734 << ", first byte = " << fReceiverBuffer.first_byte()
1735 << ", first block size = " << fReceiverBuffer.first_block_size()
1736 << std::endl;
1737 }
1738
1739 fReceiverBuffer.read(fUserMessage->bit_size() / 8);
1740 if (fACKOnBufferRead) {
1741 SendACK(false); // send acknowledgement
1742 }
1743
1744 itpp::Packet & msg = *fUserMessage;
1745 fUserMessage = 0;
1746
1747 if (fReceiverBuffer.first_block_size() > 0) {
1748 IndicateUserMessage();
1749 }
1750
1751 return msg;
1752}
1753
1754
1755
1756void TCP_Receiver::HandleEndOfProcessing(Ttype)
1757{
1758 it_assert(fUserMessage != 0, "TCP_Receiver::HandleEndOfProcessing, no message available");
1759
1760
1761 tcp_new_data(fLabel);
1762}
1763
1764
1765void TCP_Receiver::DelayedACKHandler(Ttype)
1766{
1767 if (fDebug) {
1768 std::cout << "TCP_Receiver::DelayedACKHandler "
1769 << "receiver " << fLabel
1770 << ": delACK TO: "
1771 << "t = " << Event_Queue::now() << std::endl;
1772 }
1773
1774 SendACK(true);
1775}
1776
1777
1778void TCP_Receiver::PeriodicACKHandler(Ttype)
1779{
1780 if (fDebug) {
1781 std::cout << "TCP_Receiver::PeriodicACKHandler"
1782 << "receiver " << fLabel
1783 << ": periodicACK TO: "
1784 << "t = " << Event_Queue::now() << std::endl;
1785 }
1786
1787 SendACK(true);
1788}
1789
1790
1791void TCP_Receiver::SendACK(bool sendConditionless)
1792{
1793 // sendConditionless is set
1794 // ... if packet was received out of order or
1795 // ... if delayed ACK timer has expired
1796
1797 // Bei eingeschaltetem "delayed ACK" wird ein ACK nur
1798 // gesendet, wenn das Fenster um 2MSS oder 35% der
1799 // maximalen Fenstergroesse verschoben worden ist
1800 // ... oder nach delayed ACK Timeout
1801 // ... oder wenn es das ACK fur ein Out of Order Segment ist
1802 // ... oder (in der Realitat), wenn ich auch was zu senden habe.
1803
1804 if (sendConditionless || !fDelayedACK ||
1805 (fReceiverBuffer.next_expected() - fAdvRcvNxt >= (int)(2 * fMSS)) ||
1806 (fReceiverBuffer.next_expected() - fAdvRcvNxt >=
1807 (int)(0.35 * fBufferSize))) {
1808 // Remark: RFC2581 recommends to acknowledge every second
1809 // packet conditionless (without setting this as a requirement)
1810 // in order to avoid excessive ack delays when the receiver MSS
1811 // is larger than the sender MSS. In this uni-directional
1812 // implementation, the receiver's MSS is not actively
1813 // used for sending but only for deciding when acknowledgments
1814 // have to be returned. Thus, the best solution to account for
1815 // RFC2581 is to set the receiver's MSS always equal to the
1816 // sender's MSS.
1817
1818 // Receiver Silly Window Syndrome Avoidance:
1819
1820 if (fAdvRcvNxt + fAdvRcvWnd + min(fBufferSize / 2, fMSS)
1821 <= fReceiverBuffer.first_byte() + fBufferSize) {
1822 // Die rechte Grenze des Empfangerfensters wird nur anders angezeigt
1823 // als beim letzten ACK, wenn sie sich seither um mindestens
1824 // min (BufferSize/ 2, MSS) geandert hat.
1825 fAdvRcvWnd = fBufferSize - fReceiverBuffer.first_block_size();
1826 }
1827 else {
1828 fAdvRcvWnd = fAdvRcvNxt + fAdvRcvWnd - fReceiverBuffer.next_expected();
1829 }
1830
1831 fAdvRcvNxt = fReceiverBuffer.next_expected();
1832
1833 if (fSendPeriodicACKs &&
1834 (!fStrictPeriodicACKs || !fPeriodicACKTimer.IsPending())) {
1835 fPeriodicACKTimer.Set(fPeriodicACKInterval);
1836 }
1837
1838 if (fDelayedACK && fDelayedACKTimer.IsPending()) {
1839 fDelayedACKTimer.Reset();
1840 }
1841
1842 ScheduleACKMessage();
1843 }
1844 else {
1845 if (!fDelayedACKTimer.IsPending()) {
1846 fDelayedACKTimer.Set(fACKDelayTime);
1847 if (fDebug) {
1848 std::cout << "TCP_Receiver::SendACK"
1849 << "receiver " << fLabel
1850 << ": set delACK timer: "
1851 << "t = " << Event_Queue::now() << std::endl;
1852 }
1853 }
1854 }
1855}
1856
1857
1858void TCP_Receiver::ScheduleACKMessage()
1859{
1860 if (fWaitingACKMsg == 0) {
1861 fWaitingACKMsg = new TCP_Packet;
1862 }
1863
1864 fWaitingACKMsg->set_ACK(fAdvRcvNxt);
1865 fWaitingACKMsg->set_wnd(fAdvRcvWnd);
1866 fWaitingACKMsg->set_session_id(fSessionId);
1867 fWaitingACKMsg->set_destination_port(fLabel);
1868 fWaitingACKMsg->set_source_port(fLabel);
1869 fWaitingACKMsg->set_bit_size(8*fTCPIPHeaderLength);
1870
1871 if (fACKSchedulingDelay > 0) {
1872 if (!fACKSchedulingTimer.IsPending()) {
1873 fACKSchedulingTimer.Set(fACKSchedulingDelay);
1874 }
1875 }
1876 else {
1877 SendACKMessage(Event_Queue::now());
1878 }
1879}
1880
1881
1882void TCP_Receiver::SendACKMessage(Ttype)
1883{
1884 it_assert(fWaitingACKMsg != 0, "TCP_Receiver::SendACKMessage, no ACK message waiting");
1885
1886 if (fDebug) {
1887 std::cout << "TCP_Receiver::SendACKMessage Ack sent"
1888 << "receiver " << fLabel
1889 << ": send ACK: "
1890 << "t = " << Event_Queue::now()
1891 << ", " << (*fWaitingACKMsg)
1892 << " byte_size=" << fWaitingACKMsg->bit_size() / 8
1893 << " ptr=" << fWaitingACKMsg << std::endl;
1894 }
1895
1896 tcp_send_ack(fWaitingACKMsg);
1897
1898 fWaitingACKMsg = 0;
1899}
1900
1901
1902void TCP_Receiver::TraceReceivedSeqNo(const Sequence_Number &sn)
1903{
1904 if (fDebug) {
1905 std::cout << "TCP_Receiver::TraceReceivedSeqNo "
1906 << "receiver " << fLabel
1907 << " t = " << Event_Queue::now()
1908 << " sn = " << sn << std::endl;
1909 }
1910 if (received_seq_num_index >= received_seq_num_time.size()) {
1911 received_seq_num_time.set_size(2*received_seq_num_time.size(), true);
1912 received_seq_num_val.set_size(2*received_seq_num_val.size(), true);
1913 }
1914 received_seq_num_val(received_seq_num_index) = sn.value();
1915 received_seq_num_time(received_seq_num_index) = Event_Queue::now();
1916 received_seq_num_index++;
1917}
1918
1919
1920void TCP_Receiver::save_trace(std::string filename)
1921{
1922
1923 received_seq_num_val.set_size(received_seq_num_index, true);
1924 received_seq_num_time.set_size(received_seq_num_index, true);
1925
1926 if (fDebug) {
1927 std::cout << "received_seq_num_val" << received_seq_num_val << std::endl;
1928 std::cout << "received_seq_num_time" << received_seq_num_time << std::endl;
1929 std::cout << "received_seq_num_index" << received_seq_num_index << std::endl;
1930 std::cout << "TCP_Receiver::saving to file: " << filename << std::endl;
1931 }
1932
1933 it_file ff2;
1934 ff2.open(filename);
1935
1936 ff2 << Name("received_seq_num_val") << received_seq_num_val;
1937 ff2 << Name("received_seq_num_time") << received_seq_num_time;
1938 ff2 << Name("received_seq_num_index") << received_seq_num_index;
1939
1940 ff2.flush();
1941 ff2.close();
1942}
1943
1944
1945} //namespace itpp
1946
1947#ifdef _MSC_VER
1948#pragma warning(default:4355)
1949#endif
1950
int bit_size()
get size of packet in bits
Definition packet.h:67
TCP_Segment()
ADD DOCUMENTATION HERE.
#define it_assert(t, s)
Abort if t is not true.
Definition itassert.h:94
vec exp(const vec &x)
Exp of the elements of a vector x.
Definition log_exp.h:155
T min(const Vec< T > &in)
Minimum value of vector.
Definition min_max.h:125
T max(const Vec< T > &v)
Maximum value of vector.
Definition min_max.h:45
Definition of classes for the IT++ file format.
itpp namespace
Definition itmex.h:37
std::ostream & operator<<(std::ostream &output, const bin &inbin)
Output stream of bin.
Definition binary.cpp:36
std::string to_str(const T &i)
Convert anything to string.
Definition converters.h:444
ITPP_EXPORT double round(double x)
Round to nearest integer, return result in double.
Definition of Transport Control Protocol (TCP)