Ferreteria/v0.6/clade/IO/Aspect/Connx/Stream/Finite/Buffer/Conveyer
Jump to navigation
Jump to search
| ||||||||||||||||||||
Code
interface iConveyer extends BaseIface {}
class cConveyer extends BaseClass implements iConveyer {
// ++ CONFIG ++ //
const OBUFF_SIZE = 1024; // 1k output buffer
#const OBUFF_SIZE = 1024*64; // 64k output buffer
// -- CONFIG -- //
// ++ SETTINGS ++ //
private static $oUpd=NULL;
public static function OUpdater(?UpdateIface $o=NULL) : UpdateIface { return self::$oUpd ?? (self::$oUpd = UpdateClass::NewIfNull($o)); }
private static $oLog=NULL;
public static function OLogger(?LoggerIface $o=NULL) : LoggerIface { return is_null($o) ? self::$oLog : (self::$oLog = $o); }
// -- SETTINGS -- //
// ++ I/O: streams ++ //
// TODO 2026-01-03 Maybe this should be ConveyText(), because it reads line-by-lne and strips certain control characters.
// In this usage, $this->sBuff is the "line buffer".
public function Convey(StreamIface $oSrce, StreamIface $oDest) : HowIface {
$oSrce->Open();
$oDest->Open();
$oHow = new HowClass;
#echo $oSrce->ReflectThis()->Report(); die();
$doLoop = $ok = $oSrce->HasBytes();
$this->AmHere("DO LOOP? [$doLoop][$ok]");
#echo $oSrce->ReflectThis()->Report();
#$this->AmHere('SOURCE: '.$oSrce->VIEW_Inline()); die();
self::OUpdater()->OnBefore();
while ($doLoop) { // while there's input to process --
$sGot = $this->sBuff; // get what's currently buffered (if anything)
// refill buffer if needed
if (empty($sGot)) { // if buffer is empty --
// get another helper from the Lecturer Stream
$oGot = $oSrce->PullBytes();
$qsGot = $oGot->QData();
$sGot .= $qsGot->GetItNz();
#self::OUpdater()->OnChange(); // update the stats readout
$doLoop = $oSrce->HasBytes();
$nlGot = strlen($sGot);
$this->LogEntry("Got line ($nlGot bytes)");
$this->sBuff = $sGot; // update object-global for stats readout
self::OUpdater()->OnChange(); // update the stats readout
}
// repeatedly break $sGot into OBUFF_SIZE pieces and try to send/write them:
$ok = TRUE;
$sPut = NULL; // just for rigor / clarity
while ($ok && ($sGot !== '')) {
$this->AmHere('Buff length: '.strlen($sGot));
// This loop keeps trying to send pieces of $sGot (via $sPut) until finished or there's an error.
// If $sPut is empty, it refills it from $sGot.
// If $sGot is empty, we exit the loop.
if (empty($sPut)) {
$sPut = substr($sGot,0,self::OBUFF_SIZE); // get piece to send
$nlPut = strlen($sPut); // get length of piece
$sGot = substr($sGot,$nlPut); // remove those bytes from the input buffer
$this->LogEntry("GOT $nlPut bytes from line-buffer");
$this->sBuff = $sGot; // update object-global
self::OUpdater()->OnChange(); // update the stats readout
}
if ($sPut !== '') { // (minor optimization) if the piece hasn't been completely sent --
$nlTry = strlen($sPut); // how many bytes are we trying?
$oOp = $oDest->PushBytes($sPut); // try to send them
if (!$oOp->GetOkay()) {
$doLoop = FALSE; // error when sending: something is broken, so stop
$ok = FALSE; // stop (inner) burst-loop too
$this->LogEntry("ERROR sending $nlTry bytes");
} else {
$nlDone = $okLen;
$nRemBrst = strlen($sPut) - $nlDone;
$nRemBuff = strlen($sGot);
$sStatus = (($nlDone==$nlTry) ? 'ok!' : "ONLY SENT $nlDone");
$sLog = "TRIED $nlTry bytes: $sStatus ($nRemBrst left in burst, $nRemBuff in buffer)";
// ++ DEBUG
$this->LogEntry($sLog);
$this->LogEntry("SENT: ".substr($sPut,0,$nlDone));
// -- DEBUG
$sPut = substr($sPut,$nlDone); // remove bytes-accepted from burst-buffer
}
}
self::OUpdater()->OnCheck(); // check on other activities
}
$ok = $doLoop; // for status-report on return
// prep for next loop-thru:
if (!$oSrce->HasBytes()) { $doLoop = FALSE; }
}
self::OUpdater()->OnAfter();
$oSrce->Shut();
$oDest->Shut();
$oHow->SetOkay($ok); // NOTE 2026-01-02: This will error if there's no data to send. Is that a problem?
return $oHow;
}
// ++ I/O: logs ++ //
protected function LogEntry(string $s) {
if (is_object(self::$oLog)) {
self::$oLog->WriteEntry($s);
}
}
// -- I/O -- //
}