etl adapter amphp: GitHub File Free Download

Flow Adapter: AMP

This adapter providers async local pipeline server/worker elements implemented on amphp.

Following communication protocols are supported:

  • TCP/IP (only local) –
  • Unix Domain Socket – uinx:///var/run/etl.sock


This repo is READ ONLY, in order to contribute to Flow PHP project, please
open PR against flow monorepo.

Changes merged to monorepo are automatically propagated into sub repositories.


composer require flow-php/etl-adapter-amphp:1.x@dev

Example usage:


use Flow\ETL\Adapter\CSV\League\CSVExtractor;
use Flow\ETL\Adapter\Doctrine\DbalLoader;
use Flow\ETL\Monitoring\Memory\Consumption;
use Flow\ETL\Pipeline\LocalSocketPipeline;
use Flow\ETL\Async\ReactPHP\Worker\ChildProcessLauncher;
use Flow\ETL\Async\ReactPHP\Server\SocketServer;
use Flow\ETL\DSL\Transform;
use Flow\ETL\Flow;
use Monolog\Handler\StreamHandler;
use Monolog\Logger;
use Psr\Log\LogLevel;

require __DIR__ . '/vendor/autoload.php';

$logger = new Logger('server');
$logger->pushHandler(new StreamHandler("php://stdout", LogLevel::DEBUG, false));
$logger->pushHandler(new StreamHandler("php://stderr", LogLevel::ERROR, false));

(new Flow)
    ->read(new CSVExtractor(
        $path = __DIR__ . '/data/dataset.csv',
        new LocalSocketPipeline(
            SocketServer::unixDomain(__DIR__ . "/var/run/", $logger),
            new ChildProcessLauncher(__DIR__ . "/vendor/bin/worker-amp", $logger),
            $workers = 8
    ->rows(Transform::string_concat(['name', 'last_name'], ' ', 'name'))
    ->load(new DbalLoader($tableName, $chunkSize = 1000, $dbConnectionParams))

This adapter comes with built-in worker CLI application
but feel free to create custom.
Customization of the works will let you adjust logger or autoloader.

Go to GitHub File