Flow Adapter: AMP

This adapter providers async local pipeline server/worker elements implemented on amphp.

Following communication protocols are supported:

  • TCP/IP (only local) –
  • Unix Domain Socket – uinx:///var/run/etl.sock


composer require flow-php/etl-adapter-amphp:1.x@dev

Example usage:


use Flow\ETL\Adapter\CSV\League\CSVExtractor;
use Flow\ETL\Adapter\Doctrine\DbalLoader;
use Flow\ETL\Monitoring\Memory\Consumption;
use Flow\ETL\Pipeline\LocalSocketPipeline;
use Flow\ETL\Async\ReactPHP\Worker\ChildProcessLauncher;
use Flow\ETL\Async\ReactPHP\Server\SocketServer;
use Flow\ETL\DSL\Transform;
use Flow\ETL\Flow;
use Monolog\Handler\StreamHandler;
use Monolog\Logger;
use Psr\Log\LogLevel;

require __DIR__ . '/vendor/autoload.php';

$logger = new Logger('server');
$logger->pushHandler(new StreamHandler("php://stdout", LogLevel::DEBUG, false));
$logger->pushHandler(new StreamHandler("php://stderr", LogLevel::ERROR, false));

(new Flow)
    ->read(new CSVExtractor(
        $path = __DIR__ . '/data/dataset.csv',
        new LocalSocketPipeline(
            SocketServer::unixDomain(__DIR__ . "/var/run/", $logger),
            new ChildProcessLauncher(__DIR__ . "/vendor/bin/worker-amp", $logger),
            $workers = 8
    ->rows(Transform::string_concat(['name', 'last_name'], ' ', 'name'))
    ->load(new DbalLoader($tableName, $chunkSize = 1000, $dbConnectionParams))

This adapter comes with built-in worker CLI application
but feel free to create custom.
Customization of the works will let you adjust logger or autoloader.

