Ferreteria/v0.6/clade/IO/Aspect/Connx/Process

From Woozle Writes Code
< Ferreteria‎ | v0.6‎ | clade‎ | IO‎ | Aspect‎ | Connx
Revision as of 12:58, 26 November 2025 by Woozle (talk | contribs)
Jump to navigation Jump to search
clade: IO\Aspect\Connx\Process
Clade Family
Connx Process (none)
Clade Aliases
Alias Clade
Action* [c,i] Sys\Events\ItWent
Base* [ca,i] IO\Aspect\Connx
BufferIface IO\Aspect\Connx\Buffer
CommOp* [c,i] Sys\Events\ItWent\CommOp
MemBuffClass IO\Aspect\Connx\Buffer\InMem
SelfIface IO\Aspect\Connx\Process
Subpages

Code

as of 2025-11-26:

interface iProcess extends BaseIface {
    // SETUP
    static function FromCommand(string|array $saCmd) : self;
    #static function FromBuffers(BufferIface $oRecv, BufferIface $oSend) : self;
    // ACTION
    function Run() : CommOpIface;
}
class cProcess extends BaseClass implements SelfIface {

    // ++ SETUP ++ //

    protected function __construct(){}  // no "new" operator

    static public function FromCommand(string|array $sa) : SelfIface {
        $oThis = new static;
        $oThis->SetCommand($sa);
        return $oThis;
    }

    // ++ SETUP: dynamic / access ++ //

    private $oBufRecv = NULL;
    public function SetRecvBuff(BufferIface $o) { $this->oBufRecv = $o; }
    protected function GetRecvBuff() : BufferIface { return $this->oBufRecv; }

    private $oBufSend = NULL;
    public function SetSendBuff(BufferIface $o) { $this->oBufSend = $o; }
    protected function GetSendBuff() : BufferIface { return $this->oBufSend; }

    private $oBuffErrs = NULL;
    protected function GetErrsBuff() : BufferIface { return $this->oBuffErrs ?? ($this->oBuffErrs = new MemBuffClass); }

    private $saCmd = NULL;
    protected function SetCommand(string|array $sa) { $this->saCmd = $sa; }
    protected function GetCommand() : string|array { return $this->saCmd; }

    private $oStatus = NULL;
    public function SetOpStator(CommOpIface $o) { $this->oStatus = $o; }
    #public function GetOpStator() : CommOpIface { return $this->oStatus ?? ($this->oStatus = new CommOpClass); }
    public function GetOpStator() : CommOpIface { return $this->oStatus; }  // 2025-06-05 Caller needs to set this.

    // -- SETUP -- //
    // ++ ACCESS: internal ++ //

    private $arPipes;
    protected function DestPipe() : mixed { return $this->arPipes[0]; } // was IPipe (WRITE to this)
    protected function SrcePipe() : mixed { return $this->arPipes[1]; } // was SrcePipe (READ from this)
    protected function ErrsPipe() : mixed { return $this->arPipes[2]; } // was EPipe (READ from this)

    protected function BlockPipes(bool $bBlock) : int {
        $ar = $this->arPipes;
        $n = 0;
        foreach ($ar as $rPipe) {
            $ok = stream_set_blocking($rPipe,$bBlock);
            if ($ok) $n++;
        }
        return $n;
    }
    protected function ShutPipes() : int {
        $ar = $this->arPipes;
        $n = 0;
        foreach ($ar as $rPipe) {
            $ok = fclose($rPipe);
            if ($ok) $n++;
        }
        return $n;
    }
    protected function CountOpenPipes() : int {
        $ar = $this->arPipes;
        $n = 0;
        foreach ($ar as $ndx => $rPipe) {
            $arStat = fstat($rPipe);
            if (is_array($arStat)) {
                $n++;
            } else {
                echo "PIPE #$ndx is closed.".CRLF;
            }
        }
        return $n;
    }

    // -- ACCESS -- //
    // ++ ACTION ++ // (I/O)

    public function Send() {
        $oSendBuff = $this->GetSendBuff();
        $oRecvBuff = $this->GetRecvBuff();
        $this->BufferToStream($oSendBuff,$this->DestPipe());
        $this->StreamToBuffer($this->SrcePipe(),$oRecvBuff,FALSE);  // ...I *think*.
    }
    /**
     * ACTION: Sends the command, buffers the response, closes the command process
     * HISTORY:
     *  2024-11-24 created. This is experimental; if it works,
     *    this kind of thing should probably be encapsulated in its own class.
     *  2024-11-25 moved from shell/Secure to Shell
     */
    public function Run() : CommOpIface {
        $this->Open();

        $oBuffErrs = $this->GetErrsBuff();
        $oBuffRecv = $this->GetRecvBuff();

        $oBuffErrs->Open();
        $oBuffRecv->Open();

        $this->StreamToBuffer($this->SrcePipe(),$oBuffRecv,TRUE);     // receive any output from process
        $this->StreamToBuffer($this->ErrsPipe(),$oBuffErrs,FALSE);    // check for error messages from process

        $oAct = $this->GetOpStator();
        if ($oBuffErrs->IsMore()) {
            $sMsg = $oBuffErrs->RemoveBytes();

            $oAct->SetOkay(FALSE);
            $oAct->AddMsgObject(new MsgClass($sMsg));
            $oAct->QResponseErr()->SetIt($sMsg);
        } else {
            $oAct->SetOkay(TRUE);
            if ($oBuffRecv->SpentByteCount() === 0) {
                $oAct->AddMsgString('No error messages, but also no regular output received.');
            }
        }

        $this->Shut();
        $oBuffRecv->Shut();
        $oBuffErrs->Shut();

        return $oAct;
    }
    public function SendFromBuffer(BufferIface $oSendBuff, BufferIface $oRecvBuff) {
        $nBufSize = 1024 * 1024;  // 1M buffer

        // 2025-04-12 It just seems like good form to set these:
        $this->SetRecvBuff($oRecvBuff);
        $this->SetSendBuff($oSendBuff);
        // Maybe that will allow spinning off some pieces of the code below.

        #echo $oRecvBuff->ReflectThis()->Report();

        $oSendBuff->Open();
        $oRecvBuff->Open();
        $this->Open();
        $rSrcePipe = $this->SrcePipe(); // recv data from this pipe
        $rDestPipe = $this->DestPipe(); // send to this pipe
        $rErrsPipe = $this->ErrsPipe(); // recv errors from this pipe

        $this->AmHere('SEND BUFFER: '.$oSendBuff->Inspect()->Render());

        // Set the streams to non-blocking mode:
        $n = $this->BlockPipes(FALSE);
        #echo "Pipes unblocked: $n".CRLF;

        $doSendLoop = TRUE;
        while ($oSendBuff->IsMore() && $doSendLoop) {
            #$this->AmHere('IS MORE?['.$oSendBuff->IsMore()."] DO LOOP?[$doSendLoop] buffer class: ".get_class($oSendBuff));
            $sPiece = $oSendBuff->RemoveBytes();
            #$this->AmHere('IS MORE?['.$oSendBuff->IsMore()."] DO LOOP?[$doSendLoop]");
            $ok = fwrite($rDestPipe,$sPiece);

            $sFrag = substr($sPiece,0,32);
            $nPiece = strlen($sPiece);
            #echo "WROTE: [$sFrag]($nPiece) OK?=[$ok]".CRLF;
            if ($ok === FALSE) {
                $doSendLoop = FALSE;
                $this->AmHere('Exiting because of stream output error.'); // Is there any way to get more info?
            } elseif($ok !== strlen($sPiece)) {
                $nlSent = $ok;  // $ok = number of bytes sent
                $nlPiece = strlen($sPiece);
                #$this->AmHere("TODO: handle incomplete write (sent: $nlPiece actual: $ok)");
                $sRem = substr($sPiece,$nlSent);  // skip over bytes already sent, get rest of string
                #echo "REQUEST=[$nlPiece] ACCEPTED=[$nlSent] BUFFER STATE: ".$oSendBuff->StatusLine();
                #fgetc(STDIN); // wait for user input
                $oSendBuff->RestoreBytes($sRem);
            } else {
                $ok = file_put_contents('/home/woozle/Downloads/scratch/dba.dump.sql',$sPiece,FILE_APPEND);
                self::HardAssert(is_int($ok),'Problem with dump file');
            }

            $doRecvLoop = TRUE;
            while ($doRecvLoop) {
                $sPiece = stream_get_contents($rErrsPipe,$nBufSize);
                if (is_string($sPiece)) {
                    if ($sPiece === '') {
                        $doRecvLoop = FALSE; // no more to receive
                    } else {
                        $oScrn = self::Screen();
                        echo CRLF.$oScrn->ErrorIt('Received Error Message').': '.$sPiece.CRLF;
                    }
                } else {
                    $doRecvLoop = FALSE;
                }
            }

            $doRecvLoop = TRUE;
            while ($doRecvLoop) {
                $sPiece = stream_get_contents($rSrcePipe,$nBufSize);
                if (is_string($sPiece)) {
                    if ($sPiece === '') {
                        $doRecvLoop = FALSE; // no more to receive
                    } else {
                        $oRecvBuff->AppendBytes($sPiece);
                    }
                } else {
                    $doRecvLoop = FALSE;  // error (presumably)
                }
            }
          #$this->AmHere('IS MORE?['.$oSendBuff->IsMore()."] DO LOOP?[$doSendLoop]");

        }
        $oSendBuff->Shut();
        $oRecvBuff->Shut();
        $this->Shut();
    }

    // ++ ACTION: internal ++ //

    // ACTION: Read from the given stream into the given pipe. If no input, and $doWait is TRUE, then wait for input before returning.
    protected function StreamToBuffer($rPipe, BufferIface $oBuff,bool $doWait) {
        $nBufSize = 1024 * 1024;  // 1M buffer
        // Process the stream in a loop
        #self::GotToHere();
        stream_set_blocking($rPipe,FALSE);
        $nStart = time();
        $didWait = $doWait;
        while (!feof($rPipe) && $doWait) {
            $sPiece = fread($rPipe, $nBufSize);
            if (empty($sPiece))  {
                // Time-out if I/O hangs for more than 5 seconds:
                $doWait = (time() <= ($nStart+10));    // triggers after n+1 seconds of no data
            } else {
                $oBuff->AppendBytes($sPiece);
                $nStart = time();   // got data, so reset timer
            }
        }
        if ($didWait && !$doWait) echo 'Note: timed out waiting for response.'.CRLF;    // TODO: return this in an ItWent object.
    }
    // ACTION: Write from the given Buffer to the given stream.
    protected function BufferToStream(BufferIface $oData, $rPipe) {
        // Process the stream in a loop
        #echo $oData->ReflectThis()->Report();
        $oData->Open();
        while ($oData->IsMore()) {
            // Time-out if I/O hangs for more than 5 seconds:
            #$ok = set_time_limit(5); // NOTE: seems to always return FALSE
            $sPiece = $oData->RemoveBytes();
            // WORKING HERE
            $ok = fwrite($rPipe,$sPiece);   // TODO: this should check $ok to make sure it wrote the entire buffer -- remaining part needs to be sent again
            if ($ok === FALSE) {
                $this->AmHere('TODO: handle write error');
            } elseif($ok !== strlen($sPiece)) {
                $nlSent = $ok;
                $nlPiece = strlen($sPiece);
                #$this->AmHere("TODO: handle incomplete write (sent: $nlPiece actual: $ok)");
                $sRem = substr($sPiece,$nlSent);
                $oSendBuff->RestoreBytes($sRem);
            } else {
                $ok = file_put_contents('/home/woozle/Downloads/scratch/dba.dump.sql',$sPiece,FILE_APPEND);
                self::HardAssert(is_int($ok),'Problem with dump file');
            }
        }
        $oData->Shut();
    }

    // -- ACTION -- //
    // ++ LIFECYCLE ++ //

    private $rProc;
    // CALLBACK
    protected function ActualOpen() : ActionIface {
        $saCmd = $this->GetCommand();
        $arConf = array(  // configuration for proc_open()
          0 => array("pipe", "r"),  // stdin (input to process)
          1 => array("pipe", "w"),  // stdout (output from process}
          2 => array("pipe", "w")   // stderr (error messages from process)
        );
        $rProc = proc_open($saCmd, $arConf, $arPipes);

        $ok = is_resource($rProc);
        $oAct = new ActionClass;
        $oAct->SetOkay($ok);
        #echo "COMMAND: [$saCmd] OK?[$ok]".CRLF;

        $this->arPipes = $arPipes;
        $this->rProc = $rProc;

        $this->CountOpenPipes();    // 2025-04-12 For now, this is just a check -- prints msg if any pipe did not open.

        return $oAct;
    }
    protected function ActualShut() : ActionIface {
        #$arPipes = $this->arPipes;
        // close the pipes:
        $this->ShutPipes();
        // this must be *after* pipes are closed, to prevent deadlock:
        $nStatus = proc_close($this->rProc);
        $oAct = new ActionClass;
        $oAct->SetOkay($nStatus != -1);
        return $oAct;
    }

    // -- LIFECYCLE -- //
}