Ferreteria/v0.6/clade/IO/Aspect/Connx/Conveyer
Jump to navigation
Jump to search
| ||||||||||||||||||||||||||
About
- Note that this class only determines the class of the Updater, which does not talk directly to the output device.
- The Updater sends all output via the Panel which is set in the app config.
History
- 2026-02-19 xTODO Does this really have any business descending from Buffer? Maybe it should be in Aspect/aux or something?
- 2026-02-26 Trying a relatively-mild refactoring in which it now descends from Connx
- 2026-04-08 This was working pretty well but then I broke something somewhere, so now I'm rewriting it to be more fixable.
Code
- as of 2026-04-08 this was working, but then I broke something and now I'm about to do a substantial rewrite so I thought I should preserve its current state:
interface iConveyer extends BaseIface {
// SETUP: static
static function FromPair(StreamIface $oSrce, StreamIface $oDest) : SelfIface;
// SETUP: dynamic
function Connect(StreamIface $oSrce, StreamIface $oDest);
// EVENTS
function OnBefore();
function OnAfter();
// I/O
function ConveyCheck() : OpConveyIface;
#function ConveyNow(StreamIface $oSrce, StreamIface $oDest) : OpConveyIface;
}
class cConveyer extends BaseClass implements iConveyer {
// ++ CONFIG ++ //
#const IBUFF_SIZE = 1024*64; // 64k input buffer
const IBUFF_SIZE = 1024*1024; // 1M input buffer
#const OBUFF_SIZE = 1024; // 1k output buffer
#const OBUFF_SIZE = 1024*64; // 64k output buffer
const OBUFF_SIZE = 1024*1024; // 1M output buffer
public function SType() : string { return 'conveyer'; }
// -- CONFIG -- //
// ++ SETUP: static ++ //
public static function FromPair(StreamIface $oSrce, StreamIface $oDest) : SelfIface {
$oThis = new static;
$oThis->Connect($oSrce,$oDest);
return $oThis;
}
// ++ SETUP: dynamic ++ //
public function Connect(StreamIface $oSrce, StreamIface $oDest) {
$this->oSrce = $oSrce;
$this->oDest = $oDest;
#$this->AmHere('CONNECTING: ['.$oSrce->VIEW_Inline().'] => ['.$oDest->VIEW_Inline().']');
}
// -- SETUP -- //
// ++ SETTINGS ++ //
private $oSrce;
protected function OSrceStream() : StreamIface { return $this->oSrce; }
private $oDest;
protected function ODestStream() : StreamIface { return $this->oDest; }
private static $oLog=NULL;
public static function OLogger(?LoggerIface $o=NULL) : LoggerIface { return is_null($o) ? self::$oLog : (self::$oLog = $o); }
// -- SETTINGS -- //
// ++ OBJECTS ++ //
private $oOpConveyFactory = NULL;
protected function OOpConvey() : OpConveyIface {
$oFact = ($this->oOpConveyFactory ??= FactoryClass::FromClass(OpConveyClass::class));
return $oFact->OTarget();
}
// -- OBJECTS -- //
// ++ LIFECYCLE ++ //
protected function ActualOpen() : OpOpenIface {
$oActSrce = $this->OSrceStream()->Open();
$oActDest = $this->ODestStream()->Open();
$oAct = $this->OpOpen();
$oAct->Assimilate($oActSrce);
$oAct->Assimilate($oActDest);
return $oAct;
}
protected function ActualShut() : OpShutIface {
$oActSrce = $this->OSrceStream()->Shut();
$oActDest = $this->ODestStream()->Shut();
$oAct = $this->OpShut();
$oAct->Assimilate($oActSrce);
$oAct->Assimilate($oActDest);
return $oAct;
}
// -- LIFECYCLE -- //
// ++ EVENTS ++ //
public function OnBefore() {
$oSrce = $this->OSrceStream();
$oDest = $this->ODestStream();
$qoUpdSrce = $oSrce->QOUpdater();
$qoUpdDest = $oDest->QOUpdater();
$qoUpdSrce->OnBefore();
$qoUpdDest->OnBefore();
}
public function OnAfter() {
$oSrce = $this->OSrceStream();
$oDest = $this->ODestStream();
$qoUpdSrce = $oSrce->QOUpdater();
$qoUpdDest = $oDest->QOUpdater();
$qoUpdSrce->OnAfter();
$qoUpdDest->OnAfter();
}
// -- EVENTS -- //
// ++ I/O: streams ++ //
// ASSUMES: Srce and Dest are both open. (It doesn't really make sense to force them open/shut here.)
public function ConveyCheck() : OpConveyIface {
$oSrce = $this->OSrceStream();
$oDest = $this->ODestStream();
$qoUpdSrce = $oSrce->QOUpdater();
$qoUpdDest = $oDest->QOUpdater();
$oHow = $this->OOpConvey();
// $sGot refill check:
if (empty($this->sGot)) { // if buffer is empty --
$this->CheckConveyIn($oSrce);
$qoUpdSrce->OnIterate();
$qoUpdDest->OnIterate();
}
$this->sPut = NULL; // just for rigor / clarity
#while ($doOLoop && ($this->sGot !== '')) {
if ($this->sGot !== '') {
// This loop keeps trying to send pieces of $sGot (via $sPut) until finished or there's an error.
// If $sPut is empty, it refills it from $sGot.
// If $sGot is empty, we exit the loop.
// 2026-02-22 TODO: maybe this should use stream_copy_to_stream().
#$this->AmHereShort('$oSrce: '.$oSrce->VIEW_Inline());
if (empty($this->sPut)) {
$this->CheckConveyOut1();
$qoUpdSrce->OnIterate();
$qoUpdDest->OnIterate();
}
if ($this->sPut !== '') { // (minor optimization) if the piece hasn't been completely sent --
$this->CheckConveyOut2($oDest);
}
$qoUpdSrce->OnIterate();
$qoUpdDest->OnIterate();
}
// prep for next loop-thru:
#if (!$oSrce->HasMore()) { $doILoop = FALSE; }
$oHow->SetOkay(TRUE); // 2026-04-01 For now, there aren't any error conditions here. (There probably should be.)
return $oHow;
}
// 2026-04-01 I *almost* want to package these ops in a separate class...
// $sGot = data currently pulled from input stream ($oSrce)
// $sPut = data remaining to push to output stream ($oDest)
private $sGot;
private $sPut;
/* 2026-04-08 no longer used, I think
public function ConveyNow(StreamIface $oSrce, StreamIface $oDest) : OpConveyIface {
$oSrce->Open();
$oDest->Open();
$qoUpdSrce = $oSrce->QOUpdater();
$qoUpdDest = $oDest->QOUpdater();
$oHow = $this->OOpConvey();
$doILoop = $ok = $oSrce->HasMore();
$qoUpdSrce->OnBefore();
$qoUpdDest->OnBefore();
$this->AmHere();
$this->sGot = '';
while ($doILoop) { // while there's input to process --
// $sGot refill check:
if (empty($this->sGot)) { // if buffer is empty --
$this->CheckConveyIn($oSrce);
$qoUpdSrce->OnIterate();
$qoUpdDest->OnIterate();
$doILoop = $oSrce->HasMore(); // if we didn't reach EoF, keep looping
}
// $sGot -> $sPut loop
// -- repeatedly break $sGot into OBUFF_SIZE pieces and try to send/write them:
#$doOLoop = TRUE; // do output loop
$this->sPut = NULL; // just for rigor / clarity
#while ($doOLoop && ($this->sGot !== '')) {
if ($this->sGot !== '') {
// This loop keeps trying to send pieces of $sGot (via $sPut) until finished or there's an error.
// If $sPut is empty, it refills it from $sGot.
// If $sGot is empty, we exit the loop.
// 2026-02-22 TODO: maybe this should use stream_copy_to_stream().
#$this->AmHereShort('$oSrce: '.$oSrce->VIEW_Inline());
if (empty($this->sPut)) {
$this->CheckConveyOut1();
$qoUpdSrce->OnIterate();
$qoUpdDest->OnIterate();
}
if ($this->sPut !== '') { // (minor optimization) if the piece hasn't been completely sent --
$this->CheckConveyOut2($oDest);
}
$qoUpdSrce->OnIterate();
$qoUpdDest->OnIterate();
}
// prep for next loop-thru:
if (!$oSrce->HasMore()) { $doILoop = FALSE; }
}
$qoUpdSrce->OnAfter();
$qoUpdDest->OnAfter();
$oSrce->Shut();
$oDest->Shut();
$oHow->SetOkay($ok); // NOTE 2026-01-02: This will error if there's no data to send. Is that a problem?
return $oHow;
}
*/
protected function CheckConveyIn(StreamIface $oSrce) {
$oGot = $oSrce->PullBytes(self::IBUFF_SIZE);
$qsGot = $oGot->QData();
$this->sGot .= $qsGot->GetItNz();
#$this->sBuff = $this->sGot; // update object-globally for stats readout
$this->AmHere('RECEIVED=['.$this->sGot.']');
#$this->AmHereShort("ILOOP?[$doILoop] PULL results: ".($qsGot->HasIt() ? (strlen($qsGot->GetIt()).' bytes') : 'NOPE'));
#echo $doILoop ? 'Y' : 'n';
#echo $oSrce->ReflectThis()->Report(); die(); // where is HasMore()?
// ++ DEBUG
// # of bytes received since $oSrce was opened
#$nlPull = $oSrce->NPulledCount();
#$sPull = number_format($nlPull);
#$sDiag = "PULLED BYTES TOTAL: $sPull; More?[$doILoop]";
#$this->AmHere($sDiag);
#$this->AmHereShort('$oSrce: '.$oSrce->VIEW_Inline());
#echo $oSrce->ReflectThis()->Report(); die(); // where is HasMore()?
// -- DEBUG
#$oSrce->AmHere('UPDATER SET?['.$qoUpd->HasIt().']'); die();
}
protected function CheckConveyOut1() {
$this->sPut = substr($this->sGot,0,self::OBUFF_SIZE); // get piece to send
$this->sGot = substr($this->sGot,strlen($this->sPut)); // remove from $sGot
#$this->sBuff = $this->sGot; // update object-global (for stats readout)
$nlPut = strlen($this->sPut); // get length of piece
$this->sGot = substr($this->sGot,$nlPut); // remove those bytes from the input buffer
#$sDiag = "GOT $nlPut bytes from line-buffer";
#$this->LogEntry($sDiag);
#$this->AmHereShort($sDiag);
#$this->AmHere($sDiag);
}
protected function CheckConveyOut2(StreamIface $oDest) {
$sPut = $this->sPut;
$nlTry = strlen($sPut); // how many bytes are we trying?
#$this->AmHereShort("SENDING $nlTry BYTES");
#$sLog = "++ SENDING $nlTry BYTES from id".$oDest->ObjectID()."++".CRLF.$sPut.'-- SENDING --';
#$this->LogEntry($sLog);
#$this->AmHere($sLog);
$oOp = $oDest->PushBytes($sPut); // try to send them
if ($oOp->GetOkay()) {
$nlDone = $oOp->BytesDone(); // # bytes sent
$this->sPut = substr($this->sPut,$nlDone); // remove bytes-accepted from burst-buffer
#$this->AmHereShort("SENT $nlDone BYTES");
$nlRem = $nlTry - $nlDone;
$nlGot = strlen($this->sGot);
$this->HardAssert($nlRem === strlen($this->sPut),'Internal calculation bug!');
// ++ DEBUG
#$sStatus = (($nlDone===$nlTry) ? 'all sent!' : "ONLY SENT $nlDone");
#$sLog = "TRIED $nlTry bytes: $sStatus ($nlRem burst-bytes left, $nlGot bytes in read)";
#$this->AmHereShort($sLog);
#$this->LogEntry($sLog);
#$this->AmHere($sLog);
#$sLog = "SENT: [".substr($this->sPut,0,$nlDone).']';
#$this->LogEntry($sLog);
#$this->AmHere($sLog);
// -- DEBUG
} else {
// error when sending -- stop the loops:
$doILoop = $doOLoop = FALSE;
// flag that there was an error
$ok = FALSE;
$this->LogEntry("ERROR sending $nlTry bytes");
$this->AmHere("ERROR sending $nlTry bytes");
}
#$this->AmHere('DONE SENDING'); die();
}
// ++ I/O: UI ++ //
public function DescribeInline() : string {
return
'{'
.$this->OSrceStream()->VIEW_Inline()
.'} -> {'
.$this->ODestStream()->VIEW_Inline()
.'}';
}
// -- I/O -- //
// ++ DIAGS: screen ++ //
public function VIEW_AsBlock() : string {
return
'> SRCE: '.$this->OSrceStream()->VIEW_Inline().CRLF.
'> DEST: '.$this->ODestStream()->VIEW_Inline().CRLF;
}
// ++ DIAGS: logs ++ //
protected function LogEntry(string $s) {
if (is_object(self::$oLog)) {
self::$oLog->WriteEntry($s);
}
}
// -- DIAGS -- //
}