Ferreteria/v0.6/clade/IO/Aspect/Connx/Process/@fx/Convey
Jump to navigation
Jump to search
About
Code: Current
As of 2026-01-04 (with lots of debugging code because db imports still aren't working):
#
public function Convey(StreamIface $oSrce, StreamIface $oDest) {
$oScrn = self::Screen();
$oReadout = $this->Readout();
// Need to set these for the Readout:
$this->RecverStream($oDest); // should really be *dest* stream
$this->SenderStream($oSrce); // should really be *srce* stream
$oSrce->Open();
$oDest->Open();
$oItWent = $this->Open();
$oaPipes = $this->OAPipes();
/*
$this->AmHereShort(CRLF
.' - FROM : '.$oSrce->Inspect()->Render().CRLF
.' - TO : '.$oDest->Inspect()->Render().CRLF
.' - # OPEN = '.$oaPipes->CountOpen()
);
*/
// Set the streams to non-blocking mode:
$n = $oaPipes->Block(FALSE);
self::SoftAssert($n === 3,"Could only unblock $n pipe".($n==1?'':'s'));
$ofLogIO = self::LogFile()->InOut();
$ofLogIO->Open();
$ofLogIO->WriteEntry("++++ Convey()");
$qrDestPipe = $oaPipes->DestStream(); // where we are sending the data
$qrErrsPipe = $oaPipes->ErrsStream(); // errors from the receiver
$qrSrcePipe = $oaPipes->SrceStream(); // non-error responses from receiver
#echo $qrDestPipe->ReflectThis()->Report();
$doSendLoop = TRUE;
$nlDone = '-'; // initial value
$nlTried = '-';
$sPiece = NULL;
$nlSrceTotal = $oSrce->NTotalBytes();
$nlDoneTotal = 0;
$oSender = new SenderClass;
$oSender->OUpdater($this);
$oSender->OLogger($ofLogIO);
$oReadout->StartClock(); // reset the time counter
$oItWent = $oSender->Convey($oSend,$oDest);
$oSrce->Shut();
$oDest->Shut();
$this->Shut();
$ofLogIO->WriteEntry("---- Convey()");
$ofLogIO->Shut();
}
Code: Removed
2026-01-04
Moved to IO\Aspect\Connx\Stream\Buffer\Sender on or about 2026-01-02:
// moved to Sender on 2026-01-02 (ish)
while ($oSrce->HasBytes() && $doSendLoop) { // SEND loop
$ofLogIO->WriteEntry("++++ ITERATION << Convey()");
if (is_null($sPiece)) {
// Refill $sPiece:
$qsPiece = $oSrce->PullLine();
$sPiece = trim($qsPiece->GetIt(),"\n\r\t\v\x00") . CRLF;
$nlTried = strlen($sPiece);
$ofLogIO->WriteEntry("++++ TEXT TO SEND ($nlTried bytes):");
$ofLogIO->WriteEntry($sPiece);
$ofLogIO->WriteEntry("---- TEXT END");
}
#echo 'DEST PIPE: '.print_r(fstat($qrDestPipe->Native()),TRUE);
#echo $qrDestPipe->ReflectThis()->Report(); die();
$ok = $qrDestPipe->PushBytes($sPiece);
#usleep(0.25 * 1000*1000); // DEBUGGING: pause
if (is_int($ok)) {
$nlDone = $ok; // $ok as integer = number of bytes sent
$nlDoneTotal += $nlDone;
$nlRemTotal = $nlSrceTotal - $nlDoneTotal;
$sXtraMsg = "DONE=[$nlDoneTotal] REM=[$nlRemTotal] (now: tried=[$nlTried] ok=[$nlDone])";
$nFailed = $nlTried - $nlDone;
if ($nFailed === 0) {
$sPiece = NULL; // ready for more
} else {
$sXtraMsg .= " - [$nFailed] FAILED!";
$ofLogIO->WriteEntry("!!!! FAILED to write $nFailed bytes");
$sPiece = substr($sPiece,$nlDone); // remove xmitted bytes from local buffer
# die("WRITE FAILURE ($nFailed bytes); QUITTING.".CRLF); // ...or throw exception? not sure if this is best way to handle...
}
$oReadout->Message($sXtraMsg);
$oReadout->OnStatusChange();
// Check for possible response from receiver:
$okErr = $qrErrsPipe->PullBytes(self::IBUFF_SIZE);
if (is_string($okErr) && ($okErr !== '')) {
$sMsg = $ok; // $ok is received text
$nlMsg = strlen($sMsg);
if ($nlMsg > 64) {
$ftMsg = '"'.substr($sMsg,32).'"'." ($nlMsg chars total)";
} else {
$ftMsg = $sMsg;
}
echo CRLF.$oScrn->ErrorIt('Received Error Message').': '.$ftMsg.CRLF;
$ofLogIO->WriteEntry("++++ RECEIVED ERROR ++++");
$ofLogIO->WriteEntry($ftMsg);
$ofLogIO->WriteEntry("==== BYTES ERRORED ON:");
$ofLogIO->WriteEntry($sPiece);
$ofLogIO->WriteEntry("==== STATS:");
$ofLogIO->WriteEntry("TRIED = [$nlTried] / SENT = [$nlDone]");
$ofLogIO->WriteEntry("---- RECEIVED ERROR ----");
// TODO: this needs to be checked by the sender, somehow, because it's target-specific
if (str_contains($sMsg,'Unknown command')) {
$doSendLoop = FALSE;
echo $oScrn->ErrorIt('Stopping').' due to parsing error on remote.'.CRLF;
}
}
$okSrce = $qrSrcePipe->PullBytes(self::IBUFF_SIZE);
if (is_string($okSrce) && ($okSrce !== '')) {
echo CRLF.$oScrn->InfoIt('Received Message').': '.$ok.CRLF;
}
} else {
$sMsg = "$nlTried bytes to output of ".$this->DescribeInline();
echo $oScrn->ErrorIt('Error writing')." $nlTried bytes to output of ".$this->DescribeInline().CRLF;
$crlf = CRLF;
echo "++++ FAILED BYTES ++++$crlf$sPiece$crlf---- FAILED BYTES ----$crlf";
$ofLogIO->WriteEntry("!!!! ERROR writing $sMsg.");
die();
}
$ofLogIO->WriteEntry("---- ITERATION << Convey()");
}
2025-12-23
Removed on 2025-12-21 from the end of the if (is_int($ok)) loop:
#
//* 2025-12-21 much of this has been moved to QPipe->ReadAll()
$doRecvLoop = TRUE;
while ($doRecvLoop) { // RESPONSE loop
$sMsg = stream_get_contents($rErrsPipe,$nBufSize);
if (is_string($sMsg)) {
if ($sMsg === '') {
$doRecvLoop = FALSE; // no more to receive
} else {
$nlMsg = strlen($sMsg);
if ($nlMsg > 64) {
$ftMsg = '"'.substr($sMsg,32).'"'." ($nlMsg chars total)";
} else {
$ftMsg = $sMsg;
}
echo CRLF.$oScrn->ErrorIt('Received Error Message').': '.$ftMsg.CRLF;
$ofLogIO->WriteEntry("++++ RECEIVED ERROR ++++");
$ofLogIO->WriteEntry($ftMsg);
$ofLogIO->WriteEntry("==== BYTES ERRORED ON:");
$ofLogIO->WriteEntry($sPiece);
$ofLogIO->WriteEntry("==== STATS:");
$ofLogIO->WriteEntry("TRIED = [$nlTried] / SENT = [$nlDone]");
$ofLogIO->WriteEntry("---- RECEIVED ERROR ----");
// TODO: this needs to be checked by the sender, somehow, because it's target-specific
if (str_contains($sMsg,'Unknown command')) {
$doSendLoop = FALSE;
echo $oScrn->ErrorIt('Stopping').' due to parsing error on remote.'.CRLF;
}
}
} else {
$doRecvLoop = FALSE;
}
}