Ferreteria/v0.6/clade/IO/Aspect/Connx/Process/@fx/Convey

From Woozle Writes Code
< Ferreteria‎ | v0.6‎ | clade‎ | IO‎ | Aspect‎ | Connx‎ | Process
Jump to navigation Jump to search

About

Code: Current

As of 2026-01-04 (with lots of debugging code because db imports still aren't working):

#
    public function Convey(StreamIface $oSrce, StreamIface $oDest) {
        $oScrn = self::Screen();
        $oReadout = $this->Readout();

        // Need to set these for the Readout:
        $this->RecverStream($oDest);  // should really be *dest* stream
        $this->SenderStream($oSrce);  // should really be *srce* stream

        $oSrce->Open();
        $oDest->Open();
        $oItWent = $this->Open();

        $oaPipes = $this->OAPipes();

        /*
        $this->AmHereShort(CRLF
          .' - FROM : '.$oSrce->Inspect()->Render().CRLF
          .' - TO   : '.$oDest->Inspect()->Render().CRLF
          .' - # OPEN = '.$oaPipes->CountOpen()
          );
          */

        // Set the streams to non-blocking mode:
        $n = $oaPipes->Block(FALSE);
        self::SoftAssert($n === 3,"Could only unblock $n pipe".($n==1?'':'s'));

        $ofLogIO = self::LogFile()->InOut();
        $ofLogIO->Open();
        $ofLogIO->WriteEntry("++++ Convey()");

        $qrDestPipe = $oaPipes->DestStream(); // where we are sending the data
        $qrErrsPipe = $oaPipes->ErrsStream(); // errors from the receiver
        $qrSrcePipe = $oaPipes->SrceStream(); // non-error responses from receiver

        #echo $qrDestPipe->ReflectThis()->Report();

        $doSendLoop = TRUE;
        $nlDone = '-';  // initial value
        $nlTried = '-';
        $sPiece = NULL;
        $nlSrceTotal = $oSrce->NTotalBytes();
        $nlDoneTotal = 0;
        $oSender = new SenderClass;
        $oSender->OUpdater($this);
        $oSender->OLogger($ofLogIO);
        $oReadout->StartClock();  // reset the time counter

        $oItWent = $oSender->Convey($oSend,$oDest);

        $oSrce->Shut();
        $oDest->Shut();
        $this->Shut();
        $ofLogIO->WriteEntry("---- Convey()");
        $ofLogIO->Shut();
    }

Code: Removed

2026-01-04

Moved to IO\Aspect\Connx\Stream\Buffer\Sender on or about 2026-01-02:

// moved to Sender on 2026-01-02 (ish)
        while ($oSrce->HasBytes() && $doSendLoop) { // SEND loop
            $ofLogIO->WriteEntry("++++ ITERATION << Convey()");
            if (is_null($sPiece)) {
                // Refill $sPiece:
                $qsPiece = $oSrce->PullLine();
                $sPiece = trim($qsPiece->GetIt(),"\n\r\t\v\x00") . CRLF;
                $nlTried = strlen($sPiece);
                $ofLogIO->WriteEntry("++++ TEXT TO SEND ($nlTried bytes):");
                $ofLogIO->WriteEntry($sPiece);
                $ofLogIO->WriteEntry("---- TEXT END");
            }

            #echo 'DEST PIPE: '.print_r(fstat($qrDestPipe->Native()),TRUE);
            #echo $qrDestPipe->ReflectThis()->Report(); die();
            $ok = $qrDestPipe->PushBytes($sPiece);
            #usleep(0.25 * 1000*1000); // DEBUGGING: pause

            if (is_int($ok)) {
                $nlDone = $ok;  // $ok as integer = number of bytes sent
                $nlDoneTotal += $nlDone;
                $nlRemTotal = $nlSrceTotal - $nlDoneTotal;
                $sXtraMsg = "DONE=[$nlDoneTotal] REM=[$nlRemTotal] (now: tried=[$nlTried] ok=[$nlDone])";
                $nFailed = $nlTried - $nlDone;
                if ($nFailed === 0) {
                    $sPiece = NULL; // ready for more
                } else {
                    $sXtraMsg .= " - [$nFailed] FAILED!";
                    $ofLogIO->WriteEntry("!!!! FAILED to write $nFailed bytes");
                    $sPiece = substr($sPiece,$nlDone);  // remove xmitted bytes from local buffer
                   # die("WRITE FAILURE ($nFailed bytes); QUITTING.".CRLF);  // ...or throw exception? not sure if this is best way to handle...
                }
                $oReadout->Message($sXtraMsg);
                $oReadout->OnStatusChange();

                // Check for possible response from receiver:

                $okErr = $qrErrsPipe->PullBytes(self::IBUFF_SIZE);
                if (is_string($okErr) && ($okErr !== '')) {
                    $sMsg = $ok;  // $ok is received text
                    $nlMsg = strlen($sMsg);
                    if ($nlMsg > 64) {
                        $ftMsg = '"'.substr($sMsg,32).'"'." ($nlMsg chars total)";
                    } else {
                        $ftMsg = $sMsg;
                    }
                    echo CRLF.$oScrn->ErrorIt('Received Error Message').': '.$ftMsg.CRLF;
                    $ofLogIO->WriteEntry("++++ RECEIVED ERROR ++++");
                    $ofLogIO->WriteEntry($ftMsg);
                    $ofLogIO->WriteEntry("==== BYTES ERRORED ON:");
                    $ofLogIO->WriteEntry($sPiece);
                    $ofLogIO->WriteEntry("==== STATS:");
                    $ofLogIO->WriteEntry("TRIED = [$nlTried] / SENT = [$nlDone]");
                    $ofLogIO->WriteEntry("---- RECEIVED ERROR ----");

                    // TODO: this needs to be checked by the sender, somehow, because it's target-specific
                    if (str_contains($sMsg,'Unknown command')) {
                        $doSendLoop = FALSE;
                        echo $oScrn->ErrorIt('Stopping').' due to parsing error on remote.'.CRLF;
                    }
                }

                $okSrce = $qrSrcePipe->PullBytes(self::IBUFF_SIZE);
                if (is_string($okSrce) && ($okSrce !== '')) {
                    echo CRLF.$oScrn->InfoIt('Received Message').': '.$ok.CRLF;
                }

            } else {
                $sMsg = "$nlTried bytes to output of ".$this->DescribeInline();
                echo $oScrn->ErrorIt('Error writing')." $nlTried bytes to output of ".$this->DescribeInline().CRLF;
                $crlf = CRLF;
                echo "++++ FAILED BYTES ++++$crlf$sPiece$crlf---- FAILED BYTES ----$crlf";
                $ofLogIO->WriteEntry("!!!! ERROR writing $sMsg.");
                die();
            }
            $ofLogIO->WriteEntry("---- ITERATION << Convey()");
        }

2025-12-23

Removed on 2025-12-21 from the end of the if (is_int($ok)) loop:

#
                //* 2025-12-21 much of this has been moved to QPipe->ReadAll()
                $doRecvLoop = TRUE;
                while ($doRecvLoop) { // RESPONSE loop
                    $sMsg = stream_get_contents($rErrsPipe,$nBufSize);
                    if (is_string($sMsg)) {
                        if ($sMsg === '') {
                            $doRecvLoop = FALSE; // no more to receive
                        } else {
                            $nlMsg = strlen($sMsg);
                            if ($nlMsg > 64) {
                                $ftMsg = '"'.substr($sMsg,32).'"'." ($nlMsg chars total)";
                            } else {
                                $ftMsg = $sMsg;
                            }
                            echo CRLF.$oScrn->ErrorIt('Received Error Message').': '.$ftMsg.CRLF;
                            $ofLogIO->WriteEntry("++++ RECEIVED ERROR ++++");
                            $ofLogIO->WriteEntry($ftMsg);
                            $ofLogIO->WriteEntry("==== BYTES ERRORED ON:");
                            $ofLogIO->WriteEntry($sPiece);
                            $ofLogIO->WriteEntry("==== STATS:");
                            $ofLogIO->WriteEntry("TRIED = [$nlTried] / SENT = [$nlDone]");
                            $ofLogIO->WriteEntry("---- RECEIVED ERROR ----");

                            // TODO: this needs to be checked by the sender, somehow, because it's target-specific
                            if (str_contains($sMsg,'Unknown command')) {
                                $doSendLoop = FALSE;
                                echo $oScrn->ErrorIt('Stopping').' due to parsing error on remote.'.CRLF;
                            }
                        }
                    } else {
                        $doRecvLoop = FALSE;
                    }
                }