Ferreteria/v0.6/clade/IO/Aspect/Connx/Conveyer

From Woozle Writes Code
< Ferreteria‎ | v0.6‎ | clade‎ | IO‎ | Aspect‎ | Connx
Jump to navigation Jump to search
clade: IO\Aspect\Connx\Conveyer
Clade Family
Connx Conveyer (none)
Clade Aliases
Alias Clade
Base* [ca,i] IO\Aspect\Connx
Factory* [c,i] aux\Factory
OpConvey* [c,i] IO\Aspect\Connx\aux\itWent\OpComm
OpOpen* [c,i] IO\Aspect\Connx\aux\itWent\OpComm
OpShut* [c,i] Sys\Events\ItWent
SelfIface IO\Aspect\Connx\Conveyer
StreamIface IO\Aspect\Connx\Stream
Subpages

About

  • Note that this class only determines the class of the Updater, which does not talk directly to the output device.
    • The Updater sends all output via the Panel which is set in the app config.

History

  • 2026-02-19 xTODO Does this really have any business descending from Buffer? Maybe it should be in Aspect/aux or something?
  • 2026-02-26 Trying a relatively-mild refactoring in which it now descends from Connx
  • 2026-04-08 This was working pretty well but then I broke something somewhere, so now I'm rewriting it to be more fixable.

Code

  • as of 2026-04-08 this was working, but then I broke something and now I'm about to do a substantial rewrite so I thought I should preserve its current state:
interface iConveyer extends BaseIface {
    // SETUP: static
    static function FromPair(StreamIface $oSrce, StreamIface $oDest) : SelfIface;
    // SETUP: dynamic
    function Connect(StreamIface $oSrce, StreamIface $oDest);
    // EVENTS
    function OnBefore();
    function OnAfter();
    // I/O
    function ConveyCheck() : OpConveyIface;
    #function ConveyNow(StreamIface $oSrce, StreamIface $oDest) : OpConveyIface;
}
class cConveyer extends BaseClass implements iConveyer {
    // ++ CONFIG ++ //

    #const IBUFF_SIZE = 1024*64;  // 64k input buffer
    const IBUFF_SIZE = 1024*1024; // 1M input buffer
    #const OBUFF_SIZE = 1024;     // 1k output buffer
    #const OBUFF_SIZE = 1024*64;  // 64k output buffer
    const OBUFF_SIZE = 1024*1024; // 1M output buffer

    public function SType() : string { return 'conveyer'; }

    // -- CONFIG -- //
    // ++ SETUP: static ++ //

    public static function FromPair(StreamIface $oSrce, StreamIface $oDest) : SelfIface {
        $oThis = new static;
        $oThis->Connect($oSrce,$oDest);
        return $oThis;
    }

    // ++ SETUP: dynamic ++ //

    public function Connect(StreamIface $oSrce, StreamIface $oDest) {
        $this->oSrce = $oSrce;
        $this->oDest = $oDest;
        #$this->AmHere('CONNECTING: ['.$oSrce->VIEW_Inline().'] => ['.$oDest->VIEW_Inline().']');
    }

    // -- SETUP -- //
    // ++ SETTINGS ++ //

    private $oSrce;
    protected function OSrceStream() : StreamIface { return $this->oSrce; }
    private $oDest;
    protected function ODestStream() : StreamIface { return $this->oDest; }

    private static $oLog=NULL;
    public static function OLogger(?LoggerIface $o=NULL) : LoggerIface { return is_null($o) ? self::$oLog : (self::$oLog = $o); }

    // -- SETTINGS -- //
    // ++ OBJECTS ++ //

    private $oOpConveyFactory = NULL;
    protected function OOpConvey() : OpConveyIface {
        $oFact = ($this->oOpConveyFactory ??= FactoryClass::FromClass(OpConveyClass::class));
        return $oFact->OTarget();
    }

    // -- OBJECTS -- //
    // ++ LIFECYCLE ++ //

    protected function ActualOpen() : OpOpenIface {
        $oActSrce = $this->OSrceStream()->Open();
        $oActDest = $this->ODestStream()->Open();
        $oAct = $this->OpOpen();
        $oAct->Assimilate($oActSrce);
        $oAct->Assimilate($oActDest);
        return $oAct;
    }
    protected function ActualShut() : OpShutIface {
        $oActSrce = $this->OSrceStream()->Shut();
        $oActDest = $this->ODestStream()->Shut();
        $oAct = $this->OpShut();
        $oAct->Assimilate($oActSrce);
        $oAct->Assimilate($oActDest);
        return $oAct;
    }

    // -- LIFECYCLE -- //
    // ++ EVENTS ++ //

    public function OnBefore() {
        $oSrce = $this->OSrceStream();
        $oDest = $this->ODestStream();
        $qoUpdSrce = $oSrce->QOUpdater();
        $qoUpdDest = $oDest->QOUpdater();
        $qoUpdSrce->OnBefore();
        $qoUpdDest->OnBefore();
    }
    public function OnAfter() {
        $oSrce = $this->OSrceStream();
        $oDest = $this->ODestStream();
        $qoUpdSrce = $oSrce->QOUpdater();
        $qoUpdDest = $oDest->QOUpdater();
        $qoUpdSrce->OnAfter();
        $qoUpdDest->OnAfter();
    }

    // -- EVENTS -- //
    // ++ I/O: streams ++ //

    // ASSUMES: Srce and Dest are both open. (It doesn't really make sense to force them open/shut here.)
    public function ConveyCheck() : OpConveyIface {
        $oSrce = $this->OSrceStream();
        $oDest = $this->ODestStream();
        $qoUpdSrce = $oSrce->QOUpdater();
        $qoUpdDest = $oDest->QOUpdater();

        $oHow = $this->OOpConvey();

        // $sGot refill check:
        if (empty($this->sGot)) {  // if buffer is empty --
            $this->CheckConveyIn($oSrce);
            $qoUpdSrce->OnIterate();
            $qoUpdDest->OnIterate();
        }

        $this->sPut = NULL; // just for rigor / clarity
        #while ($doOLoop && ($this->sGot !== '')) {
        if ($this->sGot !== '') {
            // This loop keeps trying to send pieces of $sGot (via $sPut) until finished or there's an error.
            // If $sPut is empty, it refills it from $sGot.
            // If $sGot is empty, we exit the loop.

            // 2026-02-22 TODO: maybe this should use stream_copy_to_stream().

            #$this->AmHereShort('$oSrce: '.$oSrce->VIEW_Inline());
            if (empty($this->sPut)) {
                $this->CheckConveyOut1();
                $qoUpdSrce->OnIterate();
                $qoUpdDest->OnIterate();
            }
            if ($this->sPut !== '') {  // (minor optimization) if the piece hasn't been completely sent --
                $this->CheckConveyOut2($oDest);
            }
            $qoUpdSrce->OnIterate();
            $qoUpdDest->OnIterate();
        }

        // prep for next loop-thru:
        #if (!$oSrce->HasMore()) { $doILoop = FALSE; }
        $oHow->SetOkay(TRUE); // 2026-04-01 For now, there aren't any error conditions here. (There probably should be.)
        return $oHow;
    }

    // 2026-04-01 I *almost* want to package these ops in a separate class...
    // $sGot = data currently pulled from input stream ($oSrce)
    // $sPut = data remaining to push to output stream ($oDest)
    private $sGot;
    private $sPut;
    /* 2026-04-08 no longer used, I think
    public function ConveyNow(StreamIface $oSrce, StreamIface $oDest) : OpConveyIface {
        $oSrce->Open();
        $oDest->Open();

        $qoUpdSrce = $oSrce->QOUpdater();
        $qoUpdDest = $oDest->QOUpdater();

        $oHow = $this->OOpConvey();
        $doILoop = $ok = $oSrce->HasMore();
        $qoUpdSrce->OnBefore();
        $qoUpdDest->OnBefore();

        $this->AmHere();

        $this->sGot = '';
        while ($doILoop) {   // while there's input to process --
            // $sGot refill check:
            if (empty($this->sGot)) {  // if buffer is empty --
                $this->CheckConveyIn($oSrce);
                $qoUpdSrce->OnIterate();
                $qoUpdDest->OnIterate();
                $doILoop = $oSrce->HasMore();   // if we didn't reach EoF, keep looping
            }

            // $sGot -> $sPut loop
            // -- repeatedly break $sGot into OBUFF_SIZE pieces and try to send/write them:

            #$doOLoop = TRUE;  // do output loop
            $this->sPut = NULL; // just for rigor / clarity
            #while ($doOLoop && ($this->sGot !== '')) {
            if ($this->sGot !== '') {
                // This loop keeps trying to send pieces of $sGot (via $sPut) until finished or there's an error.
                // If $sPut is empty, it refills it from $sGot.
                // If $sGot is empty, we exit the loop.

                // 2026-02-22 TODO: maybe this should use stream_copy_to_stream().

                #$this->AmHereShort('$oSrce: '.$oSrce->VIEW_Inline());
                if (empty($this->sPut)) {
                    $this->CheckConveyOut1();
                    $qoUpdSrce->OnIterate();
                    $qoUpdDest->OnIterate();
                }
                if ($this->sPut !== '') {  // (minor optimization) if the piece hasn't been completely sent --
                    $this->CheckConveyOut2($oDest);
                }
                $qoUpdSrce->OnIterate();
                $qoUpdDest->OnIterate();
            }

            // prep for next loop-thru:
            if (!$oSrce->HasMore()) { $doILoop = FALSE; }
        }
        $qoUpdSrce->OnAfter();
        $qoUpdDest->OnAfter();

        $oSrce->Shut();
        $oDest->Shut();

        $oHow->SetOkay($ok);  // NOTE 2026-01-02: This will error if there's no data to send. Is that a problem?
        return $oHow;
    }
    */
    protected function CheckConveyIn(StreamIface $oSrce) {
        $oGot = $oSrce->PullBytes(self::IBUFF_SIZE);
        $qsGot = $oGot->QData();
        $this->sGot .= $qsGot->GetItNz();
        #$this->sBuff = $this->sGot;           // update object-globally for stats readout
        $this->AmHere('RECEIVED=['.$this->sGot.']');

        #$this->AmHereShort("ILOOP?[$doILoop] PULL results: ".($qsGot->HasIt() ? (strlen($qsGot->GetIt()).' bytes') : 'NOPE'));
        #echo $doILoop ? 'Y' : 'n';
        #echo $oSrce->ReflectThis()->Report(); die(); // where is HasMore()?

        // ++ DEBUG
        // # of bytes received since $oSrce was opened
        #$nlPull = $oSrce->NPulledCount();
        #$sPull = number_format($nlPull);
        #$sDiag = "PULLED BYTES TOTAL: $sPull; More?[$doILoop]";
        #$this->AmHere($sDiag);
        #$this->AmHereShort('$oSrce: '.$oSrce->VIEW_Inline());
        #echo $oSrce->ReflectThis()->Report(); die(); // where is HasMore()?
        // -- DEBUG

        #$oSrce->AmHere('UPDATER SET?['.$qoUpd->HasIt().']'); die();
    }
    protected function CheckConveyOut1() {
        $this->sPut = substr($this->sGot,0,self::OBUFF_SIZE);   // get piece to send
        $this->sGot = substr($this->sGot,strlen($this->sPut));  // remove from $sGot
        #$this->sBuff = $this->sGot;                             // update object-global (for stats readout)
        $nlPut = strlen($this->sPut);                           // get length of piece
        $this->sGot = substr($this->sGot,$nlPut);               // remove those bytes from the input buffer
        #$sDiag = "GOT $nlPut bytes from line-buffer";
        #$this->LogEntry($sDiag);
        #$this->AmHereShort($sDiag);
        #$this->AmHere($sDiag);
    }
    protected function CheckConveyOut2(StreamIface $oDest) {
        $sPut = $this->sPut;
        $nlTry = strlen($sPut);                   // how many bytes are we trying?
        #$this->AmHereShort("SENDING $nlTry BYTES");
        #$sLog = "++ SENDING $nlTry BYTES from id".$oDest->ObjectID()."++".CRLF.$sPut.'-- SENDING --';
        #$this->LogEntry($sLog);
        #$this->AmHere($sLog);

        $oOp = $oDest->PushBytes($sPut);          // try to send them

        if ($oOp->GetOkay()) {
            $nlDone = $oOp->BytesDone();  // # bytes sent
            $this->sPut = substr($this->sPut,$nlDone);     // remove bytes-accepted from burst-buffer
            #$this->AmHereShort("SENT $nlDone BYTES");
            $nlRem = $nlTry - $nlDone;
            $nlGot = strlen($this->sGot);
            $this->HardAssert($nlRem === strlen($this->sPut),'Internal calculation bug!');

            // ++ DEBUG
            #$sStatus = (($nlDone===$nlTry) ? 'all sent!' : "ONLY SENT $nlDone");
            #$sLog = "TRIED $nlTry bytes: $sStatus ($nlRem burst-bytes left, $nlGot bytes in read)";
            #$this->AmHereShort($sLog);
            #$this->LogEntry($sLog);
            #$this->AmHere($sLog);
            #$sLog = "SENT: [".substr($this->sPut,0,$nlDone).']';
            #$this->LogEntry($sLog);
            #$this->AmHere($sLog);
            // -- DEBUG

        } else {
            // error when sending -- stop the loops:
            $doILoop = $doOLoop = FALSE;
            // flag that there was an error
            $ok = FALSE;
            $this->LogEntry("ERROR sending $nlTry bytes");
            $this->AmHere("ERROR sending $nlTry bytes");
        }
        #$this->AmHere('DONE SENDING'); die();
    }

    // ++ I/O: UI ++ //

    public function DescribeInline() : string {
        return
          '{'
          .$this->OSrceStream()->VIEW_Inline()
          .'} -> {'
          .$this->ODestStream()->VIEW_Inline()
          .'}';
    }

    // -- I/O -- //
    // ++ DIAGS: screen ++ //

    public function VIEW_AsBlock() : string {
        return
          '> SRCE: '.$this->OSrceStream()->VIEW_Inline().CRLF.
          '> DEST: '.$this->ODestStream()->VIEW_Inline().CRLF;
    }

    // ++ DIAGS: logs ++ //

    protected function LogEntry(string $s) {
        if (is_object(self::$oLog)) {
            self::$oLog->WriteEntry($s);
        }
    }

    // -- DIAGS -- //

}