Ferreteria/v0.6/clade/IO/Aspect/Connx/Process/Local

From Woozle Writes Code
< Ferreteria‎ | v0.6‎ | clade‎ | IO‎ | Aspect‎ | Connx
Jump to navigation Jump to search
The item documented here has been removed and is no longer in use.
This functionality has been replaced by IO\Aspect\Connx\Stream\Runner\Local.
clade: IO\Aspect\Connx\Process\Local
Clade Family
Connx Process (none)
Clade Aliases
Alias Clade
APipes* [c,i] IO\Aspect\Connx\aux\APipes\AProcPipes
Action* [c,i] Sys\Events\ItWent
Base* [ca,i] IO\Aspect\Connx
Buffer* [c,i] IO\Aspect\Connx\Stream\Buffer
CmdLineIface Sys\Data\Codec\aux\CmdLine
CommOpObj* [c,i] Ferret\IO\Aspect\Connx\aux\ItWent
CommOpUsr* [i,t] Sys\Events\has\ItWent
LogFile* [c,i] Sys\FileSys\Node\Fi\Log
ModeClass Sys\FileSys\Mode
QObj* [c,i] Data\Mem\QVar\Obj
Readout* [c,i] IO\Aspect\Connx\aux\Readout\VProcess
SelfIface IO\Aspect\Connx\Process
SenderClass IO\Aspect\Connx\Stream\Buffer\Conveyer\Sender
Stream* [c,i] IO\Aspect\Connx\Stream
UpdateIface Sys\Events\Updater
ViewClass IO\Aspect\view\VConnx\VProcess
Subpages

History

  • 2025-03-17 [THINKING] set_time_limit() and stream_set_timeout() are unnecessary in non-blocking mode, and make things complicated.
  • 2025-03-21 [THINKING] Ultimately, any process begins with a command -- so that's the one thing we can be sure that a Process object will need to be initialized with (using FromCommand()). I'm still using a pseudoconstructor for clarity-through-naming. [UPDATE: see 2026/01/19]
  • 2025-03-28 moved from [WFe]IO -> [WFe]IO\Connx and re-parented to xConnx
  • 2026-01-17 added ItWent, with StatusClass() set to IO\Aspect\Connx\aux\ItWent so we have a place for keeping the stream objects. (These are also stored locally in the Pipe functions; need to reconcile this... Maybe a ProcOp ItWent class that remembers the Proc object instead of the Streams?)
  • 2026-01-19 changing the way this is initialized: we often need to construct the object before we have the command, in order to set options to be used during execution. The real constructor is now public again, FromCommand() is removed, and Run() is now RunCommand() (with a required CmdLine parameter).
  • 2026-02-10 remaining code commented out on 2026-02-02; now removing file

Functions

removed

Code

interface iProcess extends BaseIface {
    // SETUP
    static function FromCommand(string|array $saCmd) : self;
    #static function FromBuffers(BufferIface $oRecv, BufferIface $oSend) : self;
    // ACTION
    function Run() : CommOpIface;
}
class cProcess extends BaseClass implements SelfIface {

    // ++ SETUP ++ //

    protected function __construct(){}  // no "new" operator

    static public function FromCommand(string|array $sa) : SelfIface {
        $oThis = new static;
        $oThis->SetCommand($sa);
        return $oThis;
    }

    // ++ SETUP: dynamic / access ++ //

    private $oBufRecv = NULL;
    public function SetRecvBuff(BufferIface $o) { $this->oBufRecv = $o; }
    protected function GetRecvBuff() : BufferIface { return $this->oBufRecv; }

    private $oBufSend = NULL;
    public function SetSendBuff(BufferIface $o) { $this->oBufSend = $o; }
    protected function GetSendBuff() : BufferIface { return $this->oBufSend; }

    private $oBuffErrs = NULL;
    protected function GetErrsBuff() : BufferIface { return $this->oBuffErrs ?? ($this->oBuffErrs = new MemBuffClass); }

    private $saCmd = NULL;
    protected function SetCommand(string|array $sa) { $this->saCmd = $sa; }
    protected function GetCommand() : string|array { return $this->saCmd; }

    private $oStatus = NULL;
    public function SetOpStator(CommOpIface $o) { $this->oStatus = $o; }
    #public function GetOpStator() : CommOpIface { return $this->oStatus ?? ($this->oStatus = new CommOpClass); }
    public function GetOpStator() : CommOpIface { return $this->oStatus; }  // 2025-06-05 Caller needs to set this.

    // -- SETUP -- //
    // ++ ACCESS: internal ++ //

    private $arPipes;
    protected function DestPipe() : mixed { return $this->arPipes[0]; } // was IPipe (WRITE to this)
    protected function SrcePipe() : mixed { return $this->arPipes[1]; } // was SrcePipe (READ from this)
    protected function ErrsPipe() : mixed { return $this->arPipes[2]; } // was EPipe (READ from this)

    protected function BlockPipes(bool $bBlock) : int {
        $ar = $this->arPipes;
        $n = 0;
        foreach ($ar as $rPipe) {
            $ok = stream_set_blocking($rPipe,$bBlock);
            if ($ok) $n++;
        }
        return $n;
    }
    protected function ShutPipes() : int {
        $ar = $this->arPipes;
        $n = 0;
        foreach ($ar as $rPipe) {
            $ok = fclose($rPipe);
            if ($ok) $n++;
        }
        return $n;
    }
    protected function CountOpenPipes() : int {
        $ar = $this->arPipes;
        $n = 0;
        foreach ($ar as $ndx => $rPipe) {
            $arStat = fstat($rPipe);
            if (is_array($arStat)) {
                $n++;
            } else {
                echo "PIPE #$ndx is closed.".CRLF;
            }
        }
        return $n;
    }

    // -- ACCESS -- //
    // ++ ACTION ++ // (I/O)

    public function Send() {
        $oSendBuff = $this->GetSendBuff();
        $oRecvBuff = $this->GetRecvBuff();
        $this->BufferToStream($oSendBuff,$this->DestPipe());
        $this->StreamToBuffer($this->SrcePipe(),$oRecvBuff,FALSE);  // ...I *think*.
    }
    /**
     * ACTION: Sends the command, buffers the response, closes the command process
     * HISTORY:
     *  2024-11-24 created. This is experimental; if it works,
     *    this kind of thing should probably be encapsulated in its own class.
     *  2024-11-25 moved from shell/Secure to Shell
     */
    public function Run() : CommOpIface {
        $this->Open();

        $oBuffErrs = $this->GetErrsBuff();
        $oBuffRecv = $this->GetRecvBuff();

        $oBuffErrs->Open();
        $oBuffRecv->Open();

        $this->StreamToBuffer($this->SrcePipe(),$oBuffRecv,TRUE);     // receive any output from process
        $this->StreamToBuffer($this->ErrsPipe(),$oBuffErrs,FALSE);    // check for error messages from process

        $oAct = $this->GetOpStator();
        if ($oBuffErrs->IsMore()) {
            $sMsg = $oBuffErrs->RemoveBytes();

            $oAct->SetOkay(FALSE);
            $oAct->AddMsgObject(new MsgClass($sMsg));
            $oAct->QResponseErr()->SetIt($sMsg);
        } else {
            $oAct->SetOkay(TRUE);
            if ($oBuffRecv->SpentByteCount() === 0) {
                $oAct->AddMsgString('No error messages, but also no regular output received.');
            }
        }

        $this->Shut();
        $oBuffRecv->Shut();
        $oBuffErrs->Shut();

        return $oAct;
    }
    public function SendFromBuffer(BufferIface $oSendBuff, BufferIface $oRecvBuff) {
        $nBufSize = 1024 * 1024;  // 1M buffer

        // 2025-04-12 It just seems like good form to set these:
        $this->SetRecvBuff($oRecvBuff);
        $this->SetSendBuff($oSendBuff);
        // Maybe that will allow spinning off some pieces of the code below.

        #echo $oRecvBuff->ReflectThis()->Report();

        $oSendBuff->Open();
        $oRecvBuff->Open();
        $this->Open();
        $rSrcePipe = $this->SrcePipe(); // recv data from this pipe
        $rDestPipe = $this->DestPipe(); // send to this pipe
        $rErrsPipe = $this->ErrsPipe(); // recv errors from this pipe

        $this->AmHere('SEND BUFFER: '.$oSendBuff->Inspect()->Render());

        // Set the streams to non-blocking mode:
        $n = $this->BlockPipes(FALSE);
        #echo "Pipes unblocked: $n".CRLF;

        $doSendLoop = TRUE;
        while ($oSendBuff->IsMore() && $doSendLoop) {
            #$this->AmHere('IS MORE?['.$oSendBuff->IsMore()."] DO LOOP?[$doSendLoop] buffer class: ".get_class($oSendBuff));
            $sPiece = $oSendBuff->RemoveBytes();
            #$this->AmHere('IS MORE?['.$oSendBuff->IsMore()."] DO LOOP?[$doSendLoop]");
            $ok = fwrite($rDestPipe,$sPiece);

            $sFrag = substr($sPiece,0,32);
            $nPiece = strlen($sPiece);
            #echo "WROTE: [$sFrag]($nPiece) OK?=[$ok]".CRLF;
            if ($ok === FALSE) {
                $doSendLoop = FALSE;
                $this->AmHere('Exiting because of stream output error.'); // Is there any way to get more info?
            } elseif($ok !== strlen($sPiece)) {
                $nlSent = $ok;  // $ok = number of bytes sent
                $nlPiece = strlen($sPiece);
                #$this->AmHere("TODO: handle incomplete write (sent: $nlPiece actual: $ok)");
                $sRem = substr($sPiece,$nlSent);  // skip over bytes already sent, get rest of string
                #echo "REQUEST=[$nlPiece] ACCEPTED=[$nlSent] BUFFER STATE: ".$oSendBuff->StatusLine();
                #fgetc(STDIN); // wait for user input
                $oSendBuff->RestoreBytes($sRem);
            } else {
                $ok = file_put_contents('/home/woozle/Downloads/scratch/dba.dump.sql',$sPiece,FILE_APPEND);
                self::HardAssert(is_int($ok),'Problem with dump file');
            }

            $doRecvLoop = TRUE;
            while ($doRecvLoop) {
                $sPiece = stream_get_contents($rErrsPipe,$nBufSize);
                if (is_string($sPiece)) {
                    if ($sPiece === '') {
                        $doRecvLoop = FALSE; // no more to receive
                    } else {
                        $oScrn = self::Screen();
                        echo CRLF.$oScrn->ErrorIt('Received Error Message').': '.$sPiece.CRLF;
                    }
                } else {
                    $doRecvLoop = FALSE;
                }
            }

            $doRecvLoop = TRUE;
            while ($doRecvLoop) {
                $sPiece = stream_get_contents($rSrcePipe,$nBufSize);
                if (is_string($sPiece)) {
                    if ($sPiece === '') {
                        $doRecvLoop = FALSE; // no more to receive
                    } else {
                        $oRecvBuff->AppendBytes($sPiece);
                    }
                } else {
                    $doRecvLoop = FALSE;  // error (presumably)
                }
            }
          #$this->AmHere('IS MORE?['.$oSendBuff->IsMore()."] DO LOOP?[$doSendLoop]");

        }
        $oSendBuff->Shut();
        $oRecvBuff->Shut();
        $this->Shut();
    }

    // ++ ACTION: internal ++ //

    // ACTION: Read from the given stream into the given pipe. If no input, and $doWait is TRUE, then wait for input before returning.
    protected function StreamToBuffer($rPipe, BufferIface $oBuff,bool $doWait) {
        $nBufSize = 1024 * 1024;  // 1M buffer
        // Process the stream in a loop
        #self::GotToHere();
        stream_set_blocking($rPipe,FALSE);
        $nStart = time();
        $didWait = $doWait;
        while (!feof($rPipe) && $doWait) {
            $sPiece = fread($rPipe, $nBufSize);
            if (empty($sPiece))  {
                // Time-out if I/O hangs for more than 5 seconds:
                $doWait = (time() <= ($nStart+10));    // triggers after n+1 seconds of no data
            } else {
                $oBuff->AppendBytes($sPiece);
                $nStart = time();   // got data, so reset timer
            }
        }
        if ($didWait && !$doWait) echo 'Note: timed out waiting for response.'.CRLF;    // TODO: return this in an ItWent object.
    }
    // ACTION: Write from the given Buffer to the given stream.
    protected function BufferToStream(BufferIface $oData, $rPipe) {
        // Process the stream in a loop
        #echo $oData->ReflectThis()->Report();
        $oData->Open();
        while ($oData->IsMore()) {
            // Time-out if I/O hangs for more than 5 seconds:
            #$ok = set_time_limit(5); // NOTE: seems to always return FALSE
            $sPiece = $oData->RemoveBytes();
            // WORKING HERE
            $ok = fwrite($rPipe,$sPiece);   // TODO: this should check $ok to make sure it wrote the entire buffer -- remaining part needs to be sent again
            if ($ok === FALSE) {
                $this->AmHere('TODO: handle write error');
            } elseif($ok !== strlen($sPiece)) {
                $nlSent = $ok;
                $nlPiece = strlen($sPiece);
                #$this->AmHere("TODO: handle incomplete write (sent: $nlPiece actual: $ok)");
                $sRem = substr($sPiece,$nlSent);
                $oSendBuff->RestoreBytes($sRem);
            } else {
                $ok = file_put_contents('/home/woozle/Downloads/scratch/dba.dump.sql',$sPiece,FILE_APPEND);
                self::HardAssert(is_int($ok),'Problem with dump file');
            }
        }
        $oData->Shut();
    }

    // -- ACTION -- //
    // ++ LIFECYCLE ++ //

    private $rProc;
    // CALLBACK
    protected function ActualOpen() : ActionIface {
        $saCmd = $this->GetCommand();
        $arConf = array(  // configuration for proc_open()
          0 => array("pipe", "r"),  // stdin (input to process)
          1 => array("pipe", "w"),  // stdout (output from process}
          2 => array("pipe", "w")   // stderr (error messages from process)
        );
        $rProc = proc_open($saCmd, $arConf, $arPipes);

        $ok = is_resource($rProc);
        $oAct = new ActionClass;
        $oAct->SetOkay($ok);
        #echo "COMMAND: [$saCmd] OK?[$ok]".CRLF;

        $this->arPipes = $arPipes;
        $this->rProc = $rProc;

        $this->CountOpenPipes();    // 2025-04-12 For now, this is just a check -- prints msg if any pipe did not open.

        return $oAct;
    }
    protected function ActualShut() : ActionIface {
        #$arPipes = $this->arPipes;
        // close the pipes:
        $this->ShutPipes();
        // this must be *after* pipes are closed, to prevent deadlock:
        $nStatus = proc_close($this->rProc);
        $oAct = new ActionClass;
        $oAct->SetOkay($nStatus != -1);
        return $oAct;
    }

    // -- LIFECYCLE -- //
}