Ferreteria/v0.6/clade/IO/Aspect/Connx/aux/QStream
Jump to navigation
Jump to search
| ||||||||||||||||||
About
- Purpose: wraps a stream resource-type
- See List of Resource Types. PHP also lets you create your own resource-types and wrap them with a class (see e.g.
stream_wrapper_register()), and possibly that should be integrated into this at some point.
- See List of Resource Types. PHP also lets you create your own resource-types and wrap them with a class (see e.g.
History
- 2025-12-08 [xTODO] Explore the overlap between this and
Sys\FileSys\Aspect\InOut.- This clade handles a file as a stream, while
InOuthandles the broader random-access functionality (which allows streaming but doesn't focus on it). - This clade can also handle non-file, non-random-access streams.
- This clade handles a file as a stream, while
- 2025-12-24 renamed from QPipe (because it originally handled resources in the array set by
proc_open()) to QStream
Code
As of 2025-12-24 (FRead() is still in development):
interface iQStream extends BaseIface {
function SetBlocking(bool $bOn) : bool; // stream_set_blocking()
function SReadAll() : bool|string;
function WriteTry(string $sData) : bool|int; // try once to write the data
function WriteAll(string $sData) : bool|int; // write all the data, retrying as needed
}
class cQStream extends BaseClass implements iQStream {
// ++ CONFIG ++ //
const MAX_TRIES = 5;
const WAIT_USEC = 1000*1000 * 0.25;
const TIMEOUT_SEC = 10;
const IBUFF_SIZE = 1024*1024; // 1M input buffer
const OBUFF_SIZE = 1024*64; // 64k output buffer
// ++ MODE ++ //
public function SetBlocking(bool $bOn) : bool {
return stream_set_blocking($this->GetIt(),$bOn);
}
// -- MODE -- //
// ++ INTERNALS ++ //
protected function GetStatus() : array { return fstat($this->GetIt()); }
// -- INTERNALS -- //
// ++ ACTION ++ //
public function SReadAll() : bool|string {
$doRecvLoop = TRUE;
$sRtn = '';
$rPipe = $this->GetIt();
while ($doRecvLoop) { // RESPONSE loop
$sPiece = stream_get_contents($rPipe,self::IBUFF_SIZE);
if (is_string($sPiece)) {
if ($sPiece === '') {
// no more to receive -- we're done here:
$doRecvLoop = FALSE;
} else {
// non-empty string received -- keep it:
$sRtn .= $sPiece;
}
} else {
// There was a problem; finish up.
$doRecvLoop = FALSE;
if ($sRtn !== '') {
echo $this->CodingPrompt("Data received before prompt -- need a way to handle it. DATA: $sRtn"); die();
}
}
}
return $sRtn;
}
// PURPOSE: Like SReadAll(), but the stream must be a file (perhaps this should be another class?)
// TODO: set error in $oRes if there's a read-error
public function FReadAll(bool $doWait) : ReadResultIface {
$nStart = time();
$sRtn = NULL;
$rPipe = $this->GetIt();
$didWait = $doWait;
while (!feof($rPipe) && $doWait) {
$sPiece = fread($rPipe,self::IBUFF_SIZE);
if (empty($sPiece)) {
// Time-out if I/O hangs for more than 5 seconds:
$doWait = (time() <= ($nStart+self::TIMEOUT_SEC)); // turns off after TIMEOUT_SEC seconds of no data
} else {
$sRtn .= $sPiece;
$nStart = time(); // got data, so reset timer
}
}
$oRes = new ReadResultClass;
if (is_string($sRtn)) {
$oRes->QData()->SetIt($sRtn);
}
$oRes->TimedOut($didWait && !$doWait);
return $oRes;
}
public function WriteTry(string $sData) : bool|int { return @fwrite($this->GetIt(),$sData); }
public function WriteAll(string $sData) : bool|int {
$nTry = 0;
$nToSendFull = strlen($sData);
$sToSendFull = $sData;
$nToSendBurst = 0;
$isOk = TRUE;
while ($sToSendFull !== '') {
$this->AmHere("TOP OUTER loop: length=".strlen($sToSendFull));
while (($nTry < self::MAX_TRIES) && ($nToSendBurst > 0) && $isOk) {
$sToSendBurst = substr($sToSendFull,0,self::OBUFF_SIZE); // get next chunk to send (from start of remainder)
$sToSendFull = substr($sToSendFull,self::OBUFF_SIZE); // remove chunk from internal buffer
$this->AmHere("BEFORE BURST: nTry=$nTry burst length=$nToSendBurst");
$ok = $this->WriteBurst($sToSendBurst,$nTry);
$this->AmHere("AFTER BURST: nTry=$nTry burst length=".strlen($sToSendBurst));
if (is_int($ok)) {
} else {
$isOk = FALSE;
}
}
$this->AmHere("END OUTER loop: length=".strlen($sToSendFull));
}
return $ok;
}
protected function WriteBurst(string $sToSend, int &$nTry) : int|false{
$nToSend = strlen($sToSend);
if ($nToSend > 0) {
$ok = $this->WriteTry($sToSend);
if (is_int($ok)) {
if ($ok <= $nToSendBurst) {
if ($ok === 0) {
$nTry++;
usleep(self::WAIT_USEC);
echo '.'; // TODO: remove this or have a callback for showing retries
} else {
// SOME data sent -- reset counter (we're making progress):
$nTry = 0;
}
// just get the bytes not sent, to retry
$sToSendBurst = substr($sToSendBurst,$ok);
}
}
}
}
// ++ ACTION ++ //
}