Ferreteria/v0.6/clade/IO/Aspect/Connx/Process: Difference between revisions
Jump to navigation
Jump to search
No edit summary |
No edit summary |
||
| Line 14: | Line 14: | ||
}} | }} | ||
==History== | |||
* '''{{fmt/date|2025|03|17}}''' [THINKING] <code>[https://www.php.net/manual/en/function.set-time-limit.php set_time_limit]()</code> and <code>[https://www.php.net/manual/en/function.stream-set-timeout.php stream_set_timeout]()</code> are unnecessary in non-blocking mode, and make things complicated. | |||
* '''{{fmt/date|2025|03|21}}''' [THINKING] Ultimately, any process begins with a command -- so that's the one thing we can be sure that a Process object will need to be initialized with (using <code>{{l/ver/fx|FromCommand}}()</code>). I'm still using a pseudoconstructor for clarity-through-naming. | |||
* '''{{fmt/date|2025|03|28}}''' moved from [WFe]<code>IO</code> -> [WFe]<code>IO\Connx</code> and re-parented to xConnx | |||
==Code== | ==Code== | ||
''as of {{fmt/date|2025|11|26}}:'' | * [[/@removed/]] | ||
* ''as of {{fmt/date|2025|11|26}}:'' | |||
{{fmt/php/block|1= | {{fmt/php/block|1= | ||
interface iProcess extends BaseIface { | interface iProcess extends BaseIface { | ||
Latest revision as of 22:48, 29 November 2025
| ||||||||||||||||||||||||
History
- 2025-03-17 [THINKING]
set_time_limit()andstream_set_timeout()are unnecessary in non-blocking mode, and make things complicated. - 2025-03-21 [THINKING] Ultimately, any process begins with a command -- so that's the one thing we can be sure that a Process object will need to be initialized with (using
FromCommand()). I'm still using a pseudoconstructor for clarity-through-naming. - 2025-03-28 moved from [WFe]
IO-> [WFe]IO\Connxand re-parented to xConnx
Code
interface iProcess extends BaseIface {
// SETUP
static function FromCommand(string|array $saCmd) : self;
#static function FromBuffers(BufferIface $oRecv, BufferIface $oSend) : self;
// ACTION
function Run() : CommOpIface;
}
class cProcess extends BaseClass implements SelfIface {
// ++ SETUP ++ //
protected function __construct(){} // no "new" operator
static public function FromCommand(string|array $sa) : SelfIface {
$oThis = new static;
$oThis->SetCommand($sa);
return $oThis;
}
// ++ SETUP: dynamic / access ++ //
private $oBufRecv = NULL;
public function SetRecvBuff(BufferIface $o) { $this->oBufRecv = $o; }
protected function GetRecvBuff() : BufferIface { return $this->oBufRecv; }
private $oBufSend = NULL;
public function SetSendBuff(BufferIface $o) { $this->oBufSend = $o; }
protected function GetSendBuff() : BufferIface { return $this->oBufSend; }
private $oBuffErrs = NULL;
protected function GetErrsBuff() : BufferIface { return $this->oBuffErrs ?? ($this->oBuffErrs = new MemBuffClass); }
private $saCmd = NULL;
protected function SetCommand(string|array $sa) { $this->saCmd = $sa; }
protected function GetCommand() : string|array { return $this->saCmd; }
private $oStatus = NULL;
public function SetOpStator(CommOpIface $o) { $this->oStatus = $o; }
#public function GetOpStator() : CommOpIface { return $this->oStatus ?? ($this->oStatus = new CommOpClass); }
public function GetOpStator() : CommOpIface { return $this->oStatus; } // 2025-06-05 Caller needs to set this.
// -- SETUP -- //
// ++ ACCESS: internal ++ //
private $arPipes;
protected function DestPipe() : mixed { return $this->arPipes[0]; } // was IPipe (WRITE to this)
protected function SrcePipe() : mixed { return $this->arPipes[1]; } // was SrcePipe (READ from this)
protected function ErrsPipe() : mixed { return $this->arPipes[2]; } // was EPipe (READ from this)
protected function BlockPipes(bool $bBlock) : int {
$ar = $this->arPipes;
$n = 0;
foreach ($ar as $rPipe) {
$ok = stream_set_blocking($rPipe,$bBlock);
if ($ok) $n++;
}
return $n;
}
protected function ShutPipes() : int {
$ar = $this->arPipes;
$n = 0;
foreach ($ar as $rPipe) {
$ok = fclose($rPipe);
if ($ok) $n++;
}
return $n;
}
protected function CountOpenPipes() : int {
$ar = $this->arPipes;
$n = 0;
foreach ($ar as $ndx => $rPipe) {
$arStat = fstat($rPipe);
if (is_array($arStat)) {
$n++;
} else {
echo "PIPE #$ndx is closed.".CRLF;
}
}
return $n;
}
// -- ACCESS -- //
// ++ ACTION ++ // (I/O)
public function Send() {
$oSendBuff = $this->GetSendBuff();
$oRecvBuff = $this->GetRecvBuff();
$this->BufferToStream($oSendBuff,$this->DestPipe());
$this->StreamToBuffer($this->SrcePipe(),$oRecvBuff,FALSE); // ...I *think*.
}
/**
* ACTION: Sends the command, buffers the response, closes the command process
* HISTORY:
* 2024-11-24 created. This is experimental; if it works,
* this kind of thing should probably be encapsulated in its own class.
* 2024-11-25 moved from shell/Secure to Shell
*/
public function Run() : CommOpIface {
$this->Open();
$oBuffErrs = $this->GetErrsBuff();
$oBuffRecv = $this->GetRecvBuff();
$oBuffErrs->Open();
$oBuffRecv->Open();
$this->StreamToBuffer($this->SrcePipe(),$oBuffRecv,TRUE); // receive any output from process
$this->StreamToBuffer($this->ErrsPipe(),$oBuffErrs,FALSE); // check for error messages from process
$oAct = $this->GetOpStator();
if ($oBuffErrs->IsMore()) {
$sMsg = $oBuffErrs->RemoveBytes();
$oAct->SetOkay(FALSE);
$oAct->AddMsgObject(new MsgClass($sMsg));
$oAct->QResponseErr()->SetIt($sMsg);
} else {
$oAct->SetOkay(TRUE);
if ($oBuffRecv->SpentByteCount() === 0) {
$oAct->AddMsgString('No error messages, but also no regular output received.');
}
}
$this->Shut();
$oBuffRecv->Shut();
$oBuffErrs->Shut();
return $oAct;
}
public function SendFromBuffer(BufferIface $oSendBuff, BufferIface $oRecvBuff) {
$nBufSize = 1024 * 1024; // 1M buffer
// 2025-04-12 It just seems like good form to set these:
$this->SetRecvBuff($oRecvBuff);
$this->SetSendBuff($oSendBuff);
// Maybe that will allow spinning off some pieces of the code below.
#echo $oRecvBuff->ReflectThis()->Report();
$oSendBuff->Open();
$oRecvBuff->Open();
$this->Open();
$rSrcePipe = $this->SrcePipe(); // recv data from this pipe
$rDestPipe = $this->DestPipe(); // send to this pipe
$rErrsPipe = $this->ErrsPipe(); // recv errors from this pipe
$this->AmHere('SEND BUFFER: '.$oSendBuff->Inspect()->Render());
// Set the streams to non-blocking mode:
$n = $this->BlockPipes(FALSE);
#echo "Pipes unblocked: $n".CRLF;
$doSendLoop = TRUE;
while ($oSendBuff->IsMore() && $doSendLoop) {
#$this->AmHere('IS MORE?['.$oSendBuff->IsMore()."] DO LOOP?[$doSendLoop] buffer class: ".get_class($oSendBuff));
$sPiece = $oSendBuff->RemoveBytes();
#$this->AmHere('IS MORE?['.$oSendBuff->IsMore()."] DO LOOP?[$doSendLoop]");
$ok = fwrite($rDestPipe,$sPiece);
$sFrag = substr($sPiece,0,32);
$nPiece = strlen($sPiece);
#echo "WROTE: [$sFrag]($nPiece) OK?=[$ok]".CRLF;
if ($ok === FALSE) {
$doSendLoop = FALSE;
$this->AmHere('Exiting because of stream output error.'); // Is there any way to get more info?
} elseif($ok !== strlen($sPiece)) {
$nlSent = $ok; // $ok = number of bytes sent
$nlPiece = strlen($sPiece);
#$this->AmHere("TODO: handle incomplete write (sent: $nlPiece actual: $ok)");
$sRem = substr($sPiece,$nlSent); // skip over bytes already sent, get rest of string
#echo "REQUEST=[$nlPiece] ACCEPTED=[$nlSent] BUFFER STATE: ".$oSendBuff->StatusLine();
#fgetc(STDIN); // wait for user input
$oSendBuff->RestoreBytes($sRem);
} else {
$ok = file_put_contents('/home/woozle/Downloads/scratch/dba.dump.sql',$sPiece,FILE_APPEND);
self::HardAssert(is_int($ok),'Problem with dump file');
}
$doRecvLoop = TRUE;
while ($doRecvLoop) {
$sPiece = stream_get_contents($rErrsPipe,$nBufSize);
if (is_string($sPiece)) {
if ($sPiece === '') {
$doRecvLoop = FALSE; // no more to receive
} else {
$oScrn = self::Screen();
echo CRLF.$oScrn->ErrorIt('Received Error Message').': '.$sPiece.CRLF;
}
} else {
$doRecvLoop = FALSE;
}
}
$doRecvLoop = TRUE;
while ($doRecvLoop) {
$sPiece = stream_get_contents($rSrcePipe,$nBufSize);
if (is_string($sPiece)) {
if ($sPiece === '') {
$doRecvLoop = FALSE; // no more to receive
} else {
$oRecvBuff->AppendBytes($sPiece);
}
} else {
$doRecvLoop = FALSE; // error (presumably)
}
}
#$this->AmHere('IS MORE?['.$oSendBuff->IsMore()."] DO LOOP?[$doSendLoop]");
}
$oSendBuff->Shut();
$oRecvBuff->Shut();
$this->Shut();
}
// ++ ACTION: internal ++ //
// ACTION: Read from the given stream into the given pipe. If no input, and $doWait is TRUE, then wait for input before returning.
protected function StreamToBuffer($rPipe, BufferIface $oBuff,bool $doWait) {
$nBufSize = 1024 * 1024; // 1M buffer
// Process the stream in a loop
#self::GotToHere();
stream_set_blocking($rPipe,FALSE);
$nStart = time();
$didWait = $doWait;
while (!feof($rPipe) && $doWait) {
$sPiece = fread($rPipe, $nBufSize);
if (empty($sPiece)) {
// Time-out if I/O hangs for more than 5 seconds:
$doWait = (time() <= ($nStart+10)); // triggers after n+1 seconds of no data
} else {
$oBuff->AppendBytes($sPiece);
$nStart = time(); // got data, so reset timer
}
}
if ($didWait && !$doWait) echo 'Note: timed out waiting for response.'.CRLF; // TODO: return this in an ItWent object.
}
// ACTION: Write from the given Buffer to the given stream.
protected function BufferToStream(BufferIface $oData, $rPipe) {
// Process the stream in a loop
#echo $oData->ReflectThis()->Report();
$oData->Open();
while ($oData->IsMore()) {
// Time-out if I/O hangs for more than 5 seconds:
#$ok = set_time_limit(5); // NOTE: seems to always return FALSE
$sPiece = $oData->RemoveBytes();
// WORKING HERE
$ok = fwrite($rPipe,$sPiece); // TODO: this should check $ok to make sure it wrote the entire buffer -- remaining part needs to be sent again
if ($ok === FALSE) {
$this->AmHere('TODO: handle write error');
} elseif($ok !== strlen($sPiece)) {
$nlSent = $ok;
$nlPiece = strlen($sPiece);
#$this->AmHere("TODO: handle incomplete write (sent: $nlPiece actual: $ok)");
$sRem = substr($sPiece,$nlSent);
$oSendBuff->RestoreBytes($sRem);
} else {
$ok = file_put_contents('/home/woozle/Downloads/scratch/dba.dump.sql',$sPiece,FILE_APPEND);
self::HardAssert(is_int($ok),'Problem with dump file');
}
}
$oData->Shut();
}
// -- ACTION -- //
// ++ LIFECYCLE ++ //
private $rProc;
// CALLBACK
protected function ActualOpen() : ActionIface {
$saCmd = $this->GetCommand();
$arConf = array( // configuration for proc_open()
0 => array("pipe", "r"), // stdin (input to process)
1 => array("pipe", "w"), // stdout (output from process}
2 => array("pipe", "w") // stderr (error messages from process)
);
$rProc = proc_open($saCmd, $arConf, $arPipes);
$ok = is_resource($rProc);
$oAct = new ActionClass;
$oAct->SetOkay($ok);
#echo "COMMAND: [$saCmd] OK?[$ok]".CRLF;
$this->arPipes = $arPipes;
$this->rProc = $rProc;
$this->CountOpenPipes(); // 2025-04-12 For now, this is just a check -- prints msg if any pipe did not open.
return $oAct;
}
protected function ActualShut() : ActionIface {
#$arPipes = $this->arPipes;
// close the pipes:
$this->ShutPipes();
// this must be *after* pipes are closed, to prevent deadlock:
$nStatus = proc_close($this->rProc);
$oAct = new ActionClass;
$oAct->SetOkay($nStatus != -1);
return $oAct;
}
// -- LIFECYCLE -- //
}