Ferreteria/v0.6/clade/IO/Aspect/Connx/aux/QStream

From Woozle Writes Code
< Ferreteria‎ | v0.6‎ | clade‎ | IO‎ | Aspect‎ | Connx‎ | aux
Jump to navigation Jump to search
clade: IO\Aspect\Connx\aux\QStream
Clade Family
Res QStream (none)
Clade Aliases
Alias Clade
ActionIface Sys\Events\ItWent
Base* [c,i] Data\Mem\QVar\Res
ReadResult* [c,i] Sys\Events\ItWent\ReadOp
Subpages

About

  • Purpose: wraps a stream resource-type

History

  • 2025-12-08 [xTODO] Explore the overlap between this and Sys\FileSys\Aspect\InOut.
    • This clade handles a file as a stream, while InOut handles the broader random-access functionality (which allows streaming but doesn't focus on it).
    • This clade can also handle non-file, non-random-access streams.
  • 2025-12-24 renamed from QPipe (because it originally handled resources in the array set by proc_open()) to QStream

Code

As of 2025-12-24 (FRead() is still in development):

interface iQStream extends BaseIface {
    function SetBlocking(bool $bOn) : bool;  // stream_set_blocking()
    function SReadAll() : bool|string;
    function WriteTry(string $sData) : bool|int;  // try once to write the data
    function WriteAll(string $sData) : bool|int;  // write all the data, retrying as needed
}
class cQStream extends BaseClass implements iQStream {

    // ++ CONFIG ++ //

    const MAX_TRIES = 5;
    const WAIT_USEC = 1000*1000 * 0.25;
    const TIMEOUT_SEC = 10;
    const IBUFF_SIZE = 1024*1024; // 1M input buffer
    const OBUFF_SIZE = 1024*64;   // 64k output buffer

    // ++ MODE ++ //

    public function SetBlocking(bool $bOn) : bool {
        return stream_set_blocking($this->GetIt(),$bOn);
    }

    // -- MODE -- //
    // ++ INTERNALS ++ //

    protected function GetStatus() : array { return fstat($this->GetIt()); }

    // -- INTERNALS -- //
    // ++ ACTION ++ //

    public function SReadAll() : bool|string {
        $doRecvLoop = TRUE;
        $sRtn = '';
        $rPipe = $this->GetIt();
        while ($doRecvLoop) { // RESPONSE loop
            $sPiece = stream_get_contents($rPipe,self::IBUFF_SIZE);
            if (is_string($sPiece)) {
                if ($sPiece === '') {
                    // no more to receive -- we're done here:
                    $doRecvLoop = FALSE;
                } else {
                    // non-empty string received -- keep it:
                    $sRtn .= $sPiece;
                }
            } else {
                // There was a problem; finish up.
                $doRecvLoop = FALSE;
                if ($sRtn !== '') {
                    echo $this->CodingPrompt("Data received before prompt -- need a way to handle it. DATA: $sRtn"); die();
                }
            }
        }
        return $sRtn;
    }
    // PURPOSE: Like SReadAll(), but the stream must be a file (perhaps this should be another class?)
    // TODO: set error in $oRes if there's a read-error
    public function FReadAll(bool $doWait) : ReadResultIface {
        $nStart = time();
        $sRtn = NULL;
        $rPipe = $this->GetIt();
        $didWait = $doWait;
        while (!feof($rPipe) && $doWait) {
            $sPiece = fread($rPipe,self::IBUFF_SIZE);
            if (empty($sPiece))  {
                // Time-out if I/O hangs for more than 5 seconds:
                $doWait = (time() <= ($nStart+self::TIMEOUT_SEC));    // turns off after TIMEOUT_SEC seconds of no data
            } else {
                $sRtn .= $sPiece;
                $nStart = time();   // got data, so reset timer
            }
        }
        $oRes = new ReadResultClass;
        if (is_string($sRtn)) {
            $oRes->QData()->SetIt($sRtn);
        }
        $oRes->TimedOut($didWait && !$doWait);
        return $oRes;
    }

    public function WriteTry(string $sData) : bool|int { return @fwrite($this->GetIt(),$sData); }
    public function WriteAll(string $sData) : bool|int {
        $nTry = 0;
        $nToSendFull = strlen($sData);
        $sToSendFull = $sData;
        $nToSendBurst = 0;
        $isOk = TRUE;
        while ($sToSendFull !== '') {
          $this->AmHere("TOP OUTER loop: length=".strlen($sToSendFull));
            while (($nTry < self::MAX_TRIES) && ($nToSendBurst > 0) && $isOk) {
                $sToSendBurst = substr($sToSendFull,0,self::OBUFF_SIZE); // get next chunk to send (from start of remainder)
                $sToSendFull = substr($sToSendFull,self::OBUFF_SIZE);    // remove chunk from internal buffer
              $this->AmHere("BEFORE BURST: nTry=$nTry burst length=$nToSendBurst");
                $ok = $this->WriteBurst($sToSendBurst,$nTry);
              $this->AmHere("AFTER BURST: nTry=$nTry burst length=".strlen($sToSendBurst));
                if (is_int($ok)) {
                } else {
                    $isOk = FALSE;
                }
            }
          $this->AmHere("END OUTER loop: length=".strlen($sToSendFull));
        }
        return $ok;
    }
    protected function WriteBurst(string $sToSend, int &$nTry) : int|false{
        $nToSend = strlen($sToSend);
        if ($nToSend > 0) {
            $ok = $this->WriteTry($sToSend);
            if (is_int($ok)) {
                if ($ok <= $nToSendBurst) {
                    if ($ok === 0) {
                        $nTry++;
                        usleep(self::WAIT_USEC);
                        echo '.'; // TODO: remove this or have a callback for showing retries
                    } else {
                        // SOME data sent -- reset counter (we're making progress):
                        $nTry = 0;
                    }
                    // just get the bytes not sent, to retry
                    $sToSendBurst = substr($sToSendBurst,$ok);
                }
            }
        }
    }

    // ++ ACTION ++ //
}