Processing API data

Drupal 8: Consumption of a Third-Party API

For a recent project, we were tasked to consume the client's internal data from a custom API. Now, this scenario was lucky for us, the API provides a total item count of about 5000, but when queried with a start date, it provides all revisions of items between then and now. The premise was that the data was to be downloaded at regular intervals, so that content editors didn't need to copy and past to keep product information up to date. Updates to their external dataset would be done at a steady pace, like a handful of times a week, and would include a low number of changes, around 50 to 100 items a change. Since Drupal is known to be cumbersome with content saving, the idea of pulling in and saving data from for 5000 nodes in one go during cron didn't seem feasible. So, I premised that we could do a full import via an administration page and cron can keep the data up to date utilizing the cron queue with help from Ultimate Cron. For the sake of this blog, the API will be referred to as the Iguana API, the products will be Tea, and client, the Iguana Tea Company.

Initial Infrastructure

So, first I stubbed out administrative pages consisting of an overview page, an import form, and a settings form. The overview will be used to display the current health of the API. The import form will trigger a batch job to import and save all of the API's Tea data. The settings form for configuring access to the API and such.

drupal/modules/custom/iguana/iguana.routing.yml:
iguana.overview:
  path: '/admin/config/services/iguana'
  defaults:
    _controller: '\Drupal\iguana\Controller\IguanaOverviewController::showOverview'
    _title: 'Iguana API Status Report'
  requirements:
    _permission: 'iguana tea import'
  options:
    _admin_route: TRUE

iguana.tea_import:
  path: '/admin/config/services/iguana/tea-import'
  defaults:
    _form: '\Drupal\iguana\Form\IguanaTeaImportForm'
    _title: 'Iguana API: Tea Import'
  requirements:
    _permission: 'iguana tea import'
  options:
    _admin_route: TRUE

iguana.configuration:
  path: '/admin/config/services/iguana/config'
  defaults:
    _form: '\Drupal\iguana\Form\IguanaConfigurationForm'
    _title: 'Iguana API Configuration'
  requirements:
    _permission: 'iguana admin config'
  options:
    _admin_route: TRUE

Also, for a good administrative user experience, I added menu links and tabs:

drupal/modules/custom/iguana/iguana.links.menu.yml:
iguana.overview:
  title: Iguana API Status
  route_name: iguana.overview
  description: 'Configuration & information for the Iguana API integration.'
  parent: system.admin_config_services

iguana.tea_import:
  title: Tea Import
  route_name: iguana.tea_import
  parent: iguana.overview

iguana.configuration:
  title: Configure
  route_name: iguana.configuration
  parent: iguana.overview
drupal/modules/custom/iguana/iguana.links.task.yml:
iguana.overview:
  title: API Status
  route_name: iguana.overview
  base_route: iguana.overview

iguana.tea_import:
  title: Tea Import
  route_name: iguana.tea_import
  base_route: iguana.overview

iguana.configuration:
  title: Configure
  route_name: iguana.configuration
  base_route: iguana.overview

From previous experience, I designed this process to consist of two separate operation, save API data locally to a database table and then extract the downloaded data to Drupal nodes. It was explicitly defined that each item was uniquely identified by the field, gid or GID. So, the database table will be used for storing the raw API data keyed by the GID field.

drupal/modules/custom/iguana/iguana.install:
<?php
 
use Drupal\Core\Database\Database;
 
/**
 * Implements hook_schema().
 */
function iguana_schema() {
  $schema['iguana_tea_previous'] = [
    'description' => 'Preserves the raw data downloaded from the Iguana API for comparison.',
    'fields'      => [
      'gid' => [
        'description' => 'The primary unique ID for Iguana Tea data.',
        'type'        => 'int',
        'size'        => 'big',
        'not null'    => TRUE,
        'default'     => 0,
      ],
      'data' => [
        'description' => 'The full data of the Tea.',
        'type'        => 'blob',
        'size'        => 'big',
      ],
    ],
    'primary key' => ['gid'],
  ];
 
  $schema['iguana_tea_staging'] = [
    'description' => 'Stores the raw data downloaded from the Iguana API.',
    'fields'      => [
      'gid' => [
        'description' => 'The primary unique ID for Iguana Tea data.',
        'type'        => 'int',
        'size'        => 'big',
        'not null'    => TRUE,
        'default'     => 0,
      ],
      'data' => [
        'description' => 'The full data of the Tea.',
        'type'        => 'blob',
        'size'        => 'big',
      ],
    ],
    'primary key' => ['gid'],
  ];
 
  return $schema;
}

Now, to connect to the Iguana API, I needed to know the credentials to access it, so that meant building out the configuration form.

drupal/modules/custom/iguana/src/Form/IguanaConfigurationForm.php:
<?php
 
namespace Drupal\iguana\Form;
 
use Drupal\Core\Form\ConfigFormBase;
use Symfony\Component\HttpFoundation\Request;
use Drupal\Core\Form\FormStateInterface;
 
/**
 * Defines a form that configures forms module settings.
 */
class IguanaConfigurationForm extends ConfigFormBase {
 
  /**
   * {@inheritdoc}
   */
  public function getFormId() {
    return 'iguana_admin_settings';
  }
 
  /**
   * {@inheritdoc}
   */
  protected function getEditableConfigNames() {
    return [
      'iguana.settings',
    ];
  }
 
  /**
   * {@inheritdoc}
   */
  public function buildForm(array $form, FormStateInterface $form_state, Request $request = NULL) {
    $config = $this->config('iguana.settings');
    $state  = \Drupal::state();
    $form["#attributes"]["autocomplete"] = "off";
    $form['iguana'] = array(
      '#type'  => 'fieldset',
      '#title' => $this->t('Iguana settings'),
    );
    $form['iguana']['url'] = array(
      '#type'          => 'textfield',
      '#title'         => $this->t('Iguana API URL'),
      '#default_value' => $config->get('iguana.url'),
    );
    $form['iguana']['username'] = array(
      '#type'          => 'textfield',
      '#title'         => $this->t('Username'),
      '#default_value' => $config->get('iguana.username'),
    );
    $form['iguana']['password'] = array(
      '#type'          => 'textfield',
      '#title'         => $this->t('Password'),
      '#default_value' => '',
      '#description'   => t('Leave blank to make no changes, use an invalid string to disable if need be.')
    );
    $form['iguana']['public_key'] = array(
      '#type'          => 'textfield',
      '#title'         => $this->t('Public Key'),
      '#default_value' => $config->get('iguana.public_key'),
    );
    $form['iguana']['private_key'] = array(
      '#type'          => 'textfield',
      '#title'         => $this->t('Private Key'),
      '#default_value' => '',
      '#description'   => t('Leave blank to make no changes, use an invalid string to disable if need be.')
    );
    $form['iguana']['division'] = array(
      '#type'          => 'textfield',
      '#title'         => $this->t('Division'),
      '#default_value' => $config->get('iguana.division'),
    );
    $form['iguana']['territory'] = array(
      '#type'          => 'textfield',
      '#title'         => $this->t('Territory'),
      '#default_value' => $config->get('iguana.territory'),
    );
    $nums   = [
      5, 10, 25, 50, 75, 100, 150, 200, 250, 300, 400, 500, 600, 700, 800, 900,
    ];
    $limits = array_combine($nums, $nums);
    $form['cron_download_limit'] = [
      '#type'          => 'select',
      '#title'         => t('Cron API Download Throttle'),
      '#options'       => $limits,
      '#default_value' => $state->get('iguana.cron_download_limit', 100),
    ];
    $form['cron_process_limit'] = [
      '#type'          => 'select',
      '#title'         => t('Cron Queue Node Process Throttle'),
      '#options'       => $limits,
      '#default_value' => $state->get('iguana.cron_process_limit', 25),
    ];
    return parent::buildForm($form, $form_state);
  }
 
  /**
   * {@inheritdoc}
   */
  public function submitForm(array &$form, FormStateInterface $form_state) {
    $values = $form_state->getValues();
    $config = $this->config('iguana.settings');
    $state  = \Drupal::state();
    $config->set('iguana.url', $values['url']);
    $config->set('iguana.username', $values['username']);
    $config->set('iguana.public_key', $values['public_key']);
    $config->set('iguana.division', $values['division']);
    $config->set('iguana.territory', $values['territory']);
    $config->save();
    if (!empty($values['private_key'])) {
      $state->set('iguana.private_key', $values['private_key']);
    }
    if (!empty($values['password'])) {
      $state->set('iguana.password', $values['password']);
    }
    $state->set('iguana.cron_download_limit', $values['cron_download_limit']);
    $state->set('iguana.cron_process_limit', $values['cron_process_limit']);
  }
 
}

Specifically note how sensitive data, such as private keys, is saved via the State API instead of the Config API.

Connection Class

Next, whether the overview page is displaying the API health or the batch operation is downloading, I needed a class to simplify doing all of the basic connection operations.

drupal/modules/custom/iguana/src/IguanaConnection.php:
<?php
 
namespace Drupal\iguana;
 
use Drupal\Core\Url;
use GuzzleHttp\Client as GuzzleClient;
use GuzzleHttp\Psr7\Request as GuzzleRequest;
 
 
/**
 * Class IguanaConnection
 *
 * @package Drupal\iguana
 */
class IguanaConnection {
 
  /**
   * @var string Iguana API version to use
   */
  protected $version = 'v1';
 
  /**
   * @var string API querying method
   */
  protected $method  = 'GET';
 
  /**
   * @var \Drupal\Core\Config\Config Iguana settings
   */
  protected $config  = NULL;
 
  /**
   * @var array Store sensitive API info such as the private_key & password
   */
  protected $sensitiveConfig = [];
 
  /**
   * IguanaConnection constructor.
   */
  public function __construct() {
    $this->config = \Drupal::config('iguana.settings');
  }
 
  /**
   * Get configuration or state setting for this Iguana integration module.
   *
   * @param string $name this module's config or state.
   *
   * @return mixed
   */
  protected function getConfig($name) {
    $sensitive = [
      'private_key',
      'password',
    ];
    if (in_array($name, $sensitive)) {
      if (isset($this->sensitiveConfig[$name])) {
        return $this->sensitiveConfig[$name];
      }
      $this->sensitiveConfig[$name] = \Drupal::state()
        ->get('iguana.' . $name);
      return $this->sensitiveConfig[$name];
    }
    return $this->config->get('iguana.' . $name);
  }
 
  /**
   * Pings the Iguana API for data.
   *
   * @param string $endpoint division endpoint to query
   * @param array  $options for Url building
   *
   * @return object
   */
  public function queryEndpoint($endpoint, $options = []) {
    try {
      $response = $this->callEndpoint($endpoint, $options);
      return json_decode($response->getBody());
    } catch (\Exception $e) {
      watchdog_exception('iguana', $e);
      return (object) [
        'response_type' => '',
        'response_data' => [],
        'pagination'    => (object) [
          'total_count'    => 0,
          'current_limit'  => 0,
          'current_offset' => 0,
        ],
      ];
    }
  }
 
  /**
   * Call the Iguana API endpoint.
   *
   * @param string $endpoint
   * @param array  $options
   *
   * @return \Psr\Http\Message\ResponseInterface
   */
  public function callEndpoint($endpoint, $options = []) {
    $headers = $this->generateHeaders($this->requestUri($endpoint));
    $url     = isset($options['next_page']) ?
      $options['next_page'] : $this->requestUrl($endpoint, $options)
        ->toString();
    $client  = new GuzzleClient();
    $request = new GuzzleRequest($this->method, $url, $headers);
    return $client->send($request, ['timeout' => 30]);
  }
 
  /**
   * Build the URI part of the URL based on the endpoint and configuration.
   *
   * @param string $endpoint to the API data
   *
   * @return string
   */
  protected function requestUri($endpoint) {
    $division = $this->getConfig('division');
    return '/services/rest/' . $this->version . '/json/' . $division
           . '/' . $endpoint . '/';
  }
 
  /**
   * Build a Url object of the URL data to query the Iguana API.
   *
   * @param string $endpoint to the API data
   * @param array  $options to build the URL such as 'query_options'
   *
   * @return \Drupal\Core\Url
   */
  protected function requestUrl($endpoint, $options = []) {
    $url         = $this->getConfig('url');
    $public_key  = $this->getConfig('public_key');
    $territory   = $this->getConfig('territory');
    $request_uri = $this->requestUri($endpoint);
    $limit       = isset($options['limit']) ? $options['limit'] : 25;
    $offset      = 0;
    $start_time  = isset($options['start_time']) ? $options['start_time'] : NULL;
    $end_time    = isset($options['end_time']) ? $options['end_time'] : NULL;
    $url_query   = [
      'api_key'   => $public_key,
      'limit'     => $limit,
      'offset'    => $offset,
      'territory' => $territory,
    ];
 
    if (isset($start_time)) {
      $start_date             = new \DateTime('@' . $start_time);
      $url_query['startdate'] = $start_date->format('Y-m-d');
    }
 
    if (isset($end_time)) {
      $end_date             = new \DateTime('@' . $end_time);
      $url_query['enddate'] = $end_date->format('Y-m-d');
    }
 
    if (!empty($options['url_query']) && is_array($options['url_query'])) {
      $url_query = array_merge($url_query, $options['url_query']);
    }
 
    return Url::fromUri($url . $request_uri, [
      'query' => $url_query,
    ]);
  }
 
  /**
   * Build an array of headers to pass to the Iguana API such as the
   * signature and account.
   *
   * @param string $request_uri to the API endpoint
   *
   * @return array
   */
  protected function generateHeaders($request_uri) {
    $username       = $this->getConfig('username');
    $password       = $this->getConfig('password');
    $private_key    = $this->getConfig('private_key');
    $request_method = 'GET';
    // Date must be UTC or signature will be invalid
    $original_timezone = date_default_timezone_get();
    date_default_timezone_set('UTC');
    $message = $request_uri . $request_method . date('mdYHi');
    $headers = [
      'x-signature' => $this->generateXSignature($message, $private_key),
      'x-account'   => $this->generateXAccount($username, $password),
    ];
    date_default_timezone_set($original_timezone);
    return $headers;
  }
 
  /**
   * Builds a hash for the x-signature to send to the Iguana API according to
   * specifications.
   *
   * @param string $message
   * @param string $private_key
   *
   * @return string
   */
  protected function generateXSignature($message, $private_key) {
    return some_encoding_process($message, $private_key);
  }
 
  /**
   * Builds a hash for the x-account to send to the Iguana API according to
   * specifications.
   * @param string $username
   * @param string $password
   *
   * @return string
   */
  protected function generateXAccount($username, $password) {
    return some_other_encoding_process($username, $password);
  }
 
}

So, this reduces the process for getting API data down to two methods callEndpoint() and queryEndpoint(). The latter simply calls the former and cleans up the data for processing, where the former returns the whole response object. Also, note the method, getConfig(), that simplifies getting settings whether they are stored in yaml configuration or state.

API Health Overview

Now that I had a connection class, I could test it out on the API's overview page.

drupal/modules/custom/iguana/src/Controller/IguanaOverviewController.php:
<?php
 
namespace Drupal\iguana\Controller;
 
use Drupal\Core\Controller\ControllerBase;
use Drupal\iguana\IguanaConnection;
 
/**
 * Provides controller methods for the Iguana API integration overview.
 */
class IguanaOverviewController extends ControllerBase {
 
  /**
   * {@inheritdoc}
   */
  public function showOverview() {
    $build = [];
 
    list($response, $json) = $this->pingEndpoint($build);
    // If response data was built and returned, display it with a sample of the
    // objects returned
    if (isset($response)) {
      $build['response'] = [
        '#theme' => 'item_list',
        '#title' => t('Response: @r', [
          '@r' => $response->getReasonPhrase(),
        ]),
        '#items' => [
          'code' => t('Code: @c', ['@c' => $response->getStatusCode()]),
        ],
      ];
    }
    if (isset($json)) {
      $build['response_data'] = [
        '#theme' => 'item_list',
        '#title' => t('Response Data:'),
        '#items' => [
          'response-type' => t('Response Type: @t', [
            '@t' => $json->response_type,
          ]),
          'total-count' => t('Total Count: @c', [
            '@c' => $json->pagination->total_count,
          ]),
        ],
      ];
      $this->displayPaginationData($json, $build);
      $this->displayDataSample($json, $build);
    }
    return $build;
  }
 
  /**
   * Ping the Iguana API for basic data.
   *
   * @param array $build render array
   *
   * @return array of [$response, $json]
   */
  protected function pingEndpoint(&$build) {
    $connection = new IguanaConnection();
    $response   = NULL;
    $json       = NULL;
    try {
      $response = $connection->callEndpoint('teasDetailFull', [
        'limit'     => 10,
        'url_query' => [
          'sort' => 'gid asc',
        ]
      ]);
      $json = json_decode($response->getBody());
    } catch (\GuzzleHttp\Exception\ServerException $e) {
      // Handle their server-side errors
      $build['server_error'] = [
        '#theme' => 'item_list',
        '#title' => t('Server Exception: @r', [
          '@r' => $e->getResponse()->getReasonPhrase(),
        ]),
        '#items' => [
          'url'  => t('URL: @u', ['@u' => $e->getRequest()->getUri()]),
          'code' => t('Code: @c', ['@c' => $e->getResponse()->getStatusCode()]),
        ],
      ];
      $build['exception'] = [
        '#markup' => $e->getMessage(),
      ];
      watchdog_exception('iguana', $e);
    } catch (\GuzzleHttp\Exception\ClientException $e) {
      // Handle client-side error (e.g., authorization failures)
      $build['client_error'] = [
        '#theme' => 'item_list',
        '#title' => t('Client Exception: @r', [
          '@r' => $e->getResponse()->getReasonPhrase(),
        ]),
        '#items' => [
          'url'  => t('URL: @u', ['@u' => $e->getRequest()->getUri()]),
          'code' => t('Code: @c', ['@c' => $e->getResponse()->getStatusCode()]),
        ],
      ];
      $build['exception'] = [
        '#markup' => $e->getMessage(),
      ];
      watchdog_exception('iguana', $e);
    } catch (\Exception $e) {
      // Handle general PHP exemptions
      $build['php_error'] = [
        '#theme' => 'item_list',
        '#title' => t('PHP Exception'),
        '#items' => [
          'code' => t('Code: @c', ['@c' => $e->getCode()]),
        ],
      ];
      $build['exception'] = [
        '#markup' => $e->getMessage(),
      ];
      watchdog_exception('iguana', $e);
    }
    return [$response, $json];
  }
 
  /**
   * Build out any available data for pagination.
   *
   * @param object $json
   * @param array  $build render array
   */
  protected function displayPaginationData($json, &$build) {
    if (isset($json->pagination->current_limit)) {
      $build['response_data']['#items']['current-limit'] = t('Current Limit: @l', [
        '@l' => $json->pagination->current_limit,
      ]);
    }
    if (isset($json->pagination->current_offset)) {
      $build['response_data']['#items']['current-offset'] = t('Current Offset: @o', [
        '@o' => $json->pagination->current_offset,
      ]);
    }
    if (isset($json->pagination->first)) {
      $build['response_data']['#items']['first'] = t('First URL: @f', [
        '@f' => $json->pagination->first,
      ]);
    }
    if (isset($json->pagination->prev)) {
      $build['response_data']['#items']['prev'] = t('Previous URL: @p', [
        '@p' => $json->pagination->prev,
      ]);
    }
    if (isset($json->pagination->next)) {
      $build['response_data']['#items']['next'] = t('Next URL: @n', [
        '@n' => $json->pagination->next,
      ]);
    }
    if (isset($json->pagination->last)) {
      $build['response_data']['#items']['last'] = t('Last URL: @l', [
        '@l' => $json->pagination->last,
      ]);
    }
  }
 
  /**
   * Build out a sample of the data returned.
   *
   * @param object $json
   * @param array  $build render array
   */
  protected function displayDataSample($json, &$build) {
    if (isset($json->response_data[0])) {
      $tea_data = $json->response_data[0];
      $build['tea_sample'] = [
        '#prefix' => '<pre>',
        '#markup' => print_r($tea_data, TRUE),
        '#suffix' => '</pre>',
      ];
    }
  }
 
}

The portions displaying the JSON data are unique to the Iguana API, but the pinging of the API and the error handling around it should be noted.

Batch API Operations

So, with the API returning some data for the overview page, building out the data processing with the Batch API operations was next. Much of what was be done here was easily replicated for cron since both process the data in small batch sizes. Batch operations work best when there is total count to work towards, so taking an example from the overview page, I started with the form body capturing that total count. This also allows for an error check, if the API returns zero as a total, the submit button can be disabled.

drupal/modules/custom/iguana/src/Form/IguanaTeaImportForm.php:
<?php
 
namespace Drupal\iguana\Form;
 
use Drupal\Core\Database\Database;
use Drupal\Core\Form\FormBase;
use Symfony\Component\HttpFoundation\Request;
use Drupal\Core\Form\FormStateInterface;
use Drupal\iguana\IguanaConnection;
use Drupal\iguana\IguanaTea;
 
/**
 * Defines a form that triggers batch operations to download and process Tea
 * data from the Iguana API.
 * Batch operations are included in this class as methods.
 */
class IguanaTeaImportForm extends FormBase {
 
  /**
   * {@inheritdoc}
   */
  public function getFormId() {
    return 'iguana_tea_import_form';
  }
 
  /**
   * {@inheritdoc}
   */
  public function buildForm(array $form, FormStateInterface $form_state, Request $request = NULL) {
    $connection = new IguanaConnection();
    $data       = $connection->queryEndpoint('teasDetailFull', [
      'limit'     => 1,
      'url_query' => [
        'sort' => 'gid asc',
      ]
    ]);
 
    if (empty($data->pagination->total_count)) {
      $msg  = 'A total count of Teas was not returned, indicating that there';
      $msg .= ' is a problem with the connection. See ';
      $msg .= '<a href="/admin/config/services/iguana">the Overview page</a>';
      $msg .= 'for more details.';
      drupal_set_message(t($msg), 'error');
    }
 
    $form['count_display'] = [
      '#type'  => 'item',
      '#title' => t('Teas Found'),
      'markup'  => [
        '#markup' => $data->pagination->total_count,
      ]
    ];
 
    $form['count'] = [
      '#type'  => 'value',
      '#value' => $data->pagination->total_count,
    ];
 
    $nums   = [
      5, 10, 25, 50, 75, 100, 150, 200, 250, 300, 400, 500, 600, 700, 800, 900,
    ];
    $limits = array_combine($nums, $nums);
    $desc   = 'This is the number of Teas the API should return each call ' .
      'as the operation pages through the data.';
    $form['download_limit'] = [
      '#type'          => 'select',
      '#title'         => t('API Download Throttle'),
      '#options'       => $limits,
      '#default_value' => 200,
      '#description'   => t($desc),
    ];
    $desc = 'This is the number of Teas to analyze and save to Drupal as ' .
      'the operation pages through the data.<br />This is labor intensive so ' .
      'usually a lower number than the above throttle';
    $form['process_limit'] = [
      '#type'          => 'select',
      '#title'         => t('Node Process Throttle'),
      '#options'       => $limits,
      '#default_value' => 50,
      '#description'   => t($desc),
    ];
 
    $form['actions']['#type'] = 'actions';
 
    $form['actions']['submit'] = [
      '#type'     => 'submit',
      '#value'    => t('Import All Teas'),
      '#disabled' => empty($data->pagination->total_count),
    ];
 
    return $form;
  }
 
  ...
}

Also, note the throttling options to provide site administrators the ability to adjust how many items will be processed in each batch iteration for the given operation. When all was said and done, I noticed that the site could handle downloading ten times as much data as it could handle saving to nodes.

The form's submit triggers two Batch API operations and protects against cron from interfering. Since the data processing will be take some time, I needed to set a state that the Tea importing is locked and clear out any cron jobs that may be currently queued.

drupal/modules/custom/iguana/src/Form/IguanaTeaImportForm.php (continued):
<?php
...
 
class IguanaTeaImportForm extends FormBase {
  ...
 
  /**
   * {@inheritdoc}
   */
  public function submitForm(array &$form, FormStateInterface $form_state) {
    $connection = Database::getConnection();
    $queue      = \Drupal::queue('iguana_tea_import_worker');
    $class      = 'Drupal\iguana\Form\IguanaTeaImportForm';
    $batch      = [
      'title'      => t('Downloading & Processing Iguana Tea Data'),
      'operations' => [
        [ // Operation to download all of the teas
          [$class, 'downloadTeas'], // Static method notation
          [
            $form_state->getValue('count', 0),
            $form_state->getValue('download_limit', 0),
          ],
        ],
        [ // Operation to process & save the tea data
          [$class, 'processTeas'], // Static method notation
          [
            $form_state->getValue('process_limit', 0),
          ],
        ],
      ],
      'finished' => [$class, 'finishedBatch'], // Static method notation
    ];
    batch_set($batch);
    // Lock cron out of processing while these batch operations are being
    // processed
    \Drupal::state()->set('iguana.tea_import_semaphore', TRUE);
    // Delete existing queue
    while ($worker = $queue->claimItem()) {
      $queue->deleteItem($worker);
    }
    // Clear out the staging table for fresh, whole data
    $connection->truncate('iguana_tea_staging')->execute();
  }
 
  ...
}

Note that for code cleanliness and maintainability, the operation and finished functions are static methods within this form class.

The first batch operation queries the API for limited number of Tea items and then saves them straight to the database table iguana_tea_staging while cycling through the API pages. Since any point in this batch can potentially be long running, I always build a $context['message'] to let the administrator know that something is processed so that they don't die of boredom.

drupal/modules/custom/iguana/src/Form/IguanaTeaImportForm.php (continued):
<?php
...
 
class IguanaTeaImportForm extends FormBase {
  ...
 
  /**
   * Batch operation to download all of the Tea data from Iguana and store
   * it in the iguana_tea_staging database table.
   *
   * @param int   $api_count
   * @param array $context
   */
  public static function downloadTeas($api_count, $limit, &$context) {
    $database = Database::getConnection();
    if (!isset($context['sandbox']['progress'])) {
      $context['sandbox'] = [
        'progress' => 0,
        'limit'    => $limit,
        'max'      => $api_count,
      ];
      $context['results']['downloaded'] = 0;
    }
    $sandbox = &$context['sandbox'];
 
    $iguana = new IguanaConnection();
    $data   = $iguana->queryEndpoint('teasDetailFull', [
      'limit'     => $sandbox['limit'],
      'url_query' => [
        'offset' => (string) $sandbox['progress'],
        'sort'   => 'gid asc',
      ],
    ]);
 
    foreach ($data->response_data as $tea_data) {
      // Check for empty or non-numeric GIDs
      if (empty($tea_data->gid)) {
        $msg = t('Empty GID at progress @p for the data:', [
          '@p' => $sandbox['progress'],
        ]);
        $msg .= '<br /><pre>' . print_r($tea_data, TRUE) . '</pre>';
        \Drupal::logger('iguana')->warning($msg);
        $sandbox['progress']++;
        continue;
      } elseif (!is_numeric($tea_data->gid)) {
        $msg = t('Non-numeric GID at progress progress @p for the data:', [
          '@p' => $sandbox['progress'],
        ]);
        $msg .= '<br /><pre>' . print_r($tea_data, TRUE) . '</pre>';
        \Drupal::logger('iguana')->warning($msg);
        $sandbox['progress']++;
        continue;
      }
      // Store the data
      $database->merge('iguana_tea_staging')
        ->key(['gid' => (int) $tea_data->gid])
        ->insertFields([
          'gid'  => (int) $tea_data->gid,
          'data' => serialize($tea_data),
        ])
        ->updateFields(['data' => serialize($tea_data)])
        ->execute()
      ;
      $context['results']['downloaded']++;
      $sandbox['progress']++;
      // Build a message so this isn't entirely boring for admins
      $context['message'] = '<h2>' . t('Downloading API data...') . '</h2>';
      $context['message'] .= t('Queried @c of @t Tea entries.', [
        '@c' => $sandbox['progress'],
        '@t' => $sandbox['max'],
      ]);
    }
 
    if ($sandbox['max']) {
      $context['finished'] = $sandbox['progress'] / $sandbox['max'];
    }
    // If completely done downloading, set the last time it was done, so that
    // cron can keep the data up to date with smaller queries
    if ($context['finished'] >= 1) {
      $last_time = \Drupal::time()->getRequestTime();
      \Drupal::state()->set('iguana.tea_import_last', $last_time);
    }
  }
 
  ...
}

Note that after the whole operation is finished, the iguana.tea_import_last state is set to log when the data was last downloaded, this will be used for the cron portion.

With all the data downloaded, the next operation takes each entry and attempts to convert it into Drupal node data, which is pretty unique to the client's needs, so that class's inner workings is omitted.

drupal/modules/custom/iguana/src/Form/IguanaTeaImportForm.php (continued):
<?php
...
 
class IguanaTeaImportForm extends FormBase {
  ...
 
  /**
   * Batch operation to extra data from the iguana_tea_staging table and
   * save it to a new node or one found via GID.
   *
   * @param array $context
   */
  public static function processTeas($limit, &$context) {
    $connection = Database::getConnection();
    if (!isset($context['sandbox']['progress'])) {
      $context['sandbox'] = [
        'progress' => 0,
        'limit'    => $limit,
        'max'      => (int)$connection->select('iguana_tea_staging', 'its')
          ->countQuery()->execute()->fetchField(),
      ];
      $context['results']['teas'] = 0;
      $context['results']['nodes']  = 0;
      // Count new versus existing
      $context['results']['nodes_inserted'] = 0;
      $context['results']['nodes_updated']  = 0;
    }
    $sandbox = &$context['sandbox'];
 
    $query = $connection->select('iguana_tea_staging', 'its')
      ->fields('its')
      ->range(0, $sandbox['limit'])
    ;
    $results = $query->execute();
 
    foreach ($results as $row) {
      $gid        = (int) $row->gid;
      $tea_data   = unserialize($row->data);
      $tea        = new IguanaTea($tea_data);
      $node_saved = $tea->processTea(); // Custom data-to-node processing
 
      $connection->merge('iguana_tea_previous')
        ->key(['gid' => $gid])
        ->insertFields([
          'gid'  => $gid,
          'data' => $row->data,
        ])
        ->updateFields(['data' => $row->data])
        ->execute()
      ;
 
      $query = $connection->delete('iguana_tea_staging');
      $query->condition('gid', $gid);
      $query->execute();
 
      $sandbox['progress']++;
      $context['results']['teas']++;
      // Tally only the nodes saved
      if ($node_saved) {
        $context['results']['nodes']++;
        $context['results']['nodes_' . $node_saved]++;
      }
 
      // Build a message so this isn't entirely boring for admins
      $msg = '<h2>' . t('Processing API data to site content...') . '</h2>';
      $msg .= t('Processed @p of @t Teas, @n new & @u updated', [
        '@p' => $sandbox['progress'],
        '@t' => $sandbox['max'],
        '@n' => $context['results']['nodes_inserted'],
        '@u' => $context['results']['nodes_updated'],
      ]);
      $msg .= '<br />';
      $msg .= t('Last tea: %t %g %n', [
        '%t' => $tea->getTitle(),
        '%g' => '(GID:' . $gid . ')',
        '%n' => '(node:' . $tea->getNode()->id() . ')',
      ]);
      $context['message'] = $msg;
    }
 
    if ($sandbox['max']) {
      $context['finished'] = $sandbox['progress'] / $sandbox['max'];
    }
  }
 
  ...
}

Finally, batch finished function needs to unlock cron so that it may do its part in updating the data.

drupal/modules/custom/iguana/src/Form/IguanaTeaImportForm.php (continued):
<?php
...
 
class IguanaTeaImportForm extends FormBase {
  ...
 
  /**
   * Reports the results of the Tea import operations.
   *
   * @param bool  $success
   * @param array $results
   * @param array $operations
   */
  public static function finishedBatch($success, $results, $operations) {
    // Unlock to allow cron to update the data later
    \Drupal::state()->set('iguana.tea_import_semaphore', FALSE);
    // The 'success' parameter means no fatal PHP errors were detected. All
    // other error management should be handled using 'results'.
    $downloaded = t('Finished with an error.');
    $processed  = FALSE;
    $saved      = FALSE;
    $inserted   = FALSE;
    $updated    = FALSE;
    if ($success) {
      $downloaded = \Drupal::translation()->formatPlural(
        $results['downloaded'],
        'One tea downloaded.',
        '@count teas downloaded.'
      );
      $processed  = \Drupal::translation()->formatPlural(
        $results['teas'],
        'One tea processed.',
        '@count teas processed.'
      );
      $saved      = \Drupal::translation()->formatPlural(
        $results['nodes'],
        'One node saved.',
        '@count nodes saved.'
      );
      $inserted   = \Drupal::translation()->formatPlural(
        $results['nodes_inserted'],
        'One was created.',
        '@count were created.'
      );
      $updated    = \Drupal::translation()->formatPlural(
        $results['nodes_updated'],
        'One was updated.',
        '@count were updated.'
      );
    }
    drupal_set_message($downloaded);
    if ($processed) {
      drupal_set_message($processed);
    };
    if ($saved) {
      drupal_set_message($saved);
    };
    if ($inserted) {
      drupal_set_message($inserted);
    };
    if ($updated) {
      drupal_set_message($updated);
    };
  }
 
}

Once I had my API-to-node class working and where all of the data was being imported, building out the cron portion was fairly easy.

Cron

From working on the data processing during the batch operations, I noticed that downloading the data directly to database table processed faster than saving the node data. So for cron, I started with gathering all of the data during the hook_cron() call and then queued up jobs to process that data. Since the Iguana API returns only the recent Tea revisions that were done since the start date, I assumed that only a manageable count will be downloaded at any given cron run. If the whole data set is updated, then it might require running the batch operation manually. The implementation of hook_cron() simply does a few minor checks, downloads any new content, and queues up enough jobs to process the data.

drupal/modules/custom/iguana/iguana.module:
<?php
 
/**
 * @file
 * Iguana API integration module file.
 */
 
use Drupal\Core\Database\Database;
use Drupal\iguana\IguanaConnection;
 
/**
 * Implements hook_cron().
 */
function iguana_cron() {
  $state     = \Drupal::state();
  $locked    = $state->get('iguana.tea_import_semaphore', FALSE);
  $last_time = $state->get('iguana.tea_import_last', FALSE);
 
  if (!$locked && $last_time) {
    $database   = Database::getConnection();
    $iguana     = new IguanaConnection();
    $queue      = \Drupal::queue('iguana_tea_import_worker');
    $api_limit  = $state->get('iguana.cron_download_limit', 100);
    $save_limit = $state->get('iguana.cron_process_limit', 10);
    $data       = NULL;
    $new_data   = [];
 
    // Pull all data into an array
    // TODO: limit checks in case all of the thousands of Teas have new
    // revisions
    do {
      // If there is have a 'next' URL returned, use that one for simplicity
      $next_page = NULL;
      if (isset($data->pagination->next)) {
        $next_page = $data->pagination->next;
      }
      $data = $iguana->queryEndpoint('teasDetailFull', [
        'limit'      => $api_limit,
        'start_time' => $last_time,
        'next_page'  => isset($next_page) ? $next_page : NULL,
      ]);
      $new_data = array_merge($new_data, $data->response_data);
    } while (isset($data->pagination->next));
 
    $gids      = [];
    $new_count = count($new_data);
    foreach ($new_data as $index => $tea_data) {
      if (empty($tea_data->gid)) {
        \Drupal::logger('iguana')->warning(t('Empty GID at progress @p for the data:<br /><pre>@v</pre>', [
          '@v' => print_r($tea_data, TRUE),
          '@p' => $index,
        ]));
        continue;
      }
      elseif (!is_numeric($tea_data->gid)) {
        \Drupal::logger('iguana')->warning(t('Non-numeric GID at progress @p for the data:<br /><pre>@v</pre>', [
          '@v' => print_r($tea_data, TRUE),
          '@p' => $index,
        ]));
        continue;
      }
      // Save the data to the local database
      $database->merge('iguana_tea_staging')
        ->key(['gid' => (int) $tea_data->gid])
        ->insertFields([
          'gid'  => (int) $tea_data->gid,
          'data' => serialize($tea_data),
        ])
        ->updateFields(['data' => serialize($tea_data)])
        ->execute()
      ;
      $gids[] = (int) $tea_data->gid;
      // If enough Teas have been stored or the last one just was strored,
      // then queue up a worker to process them and reset the IDs array
      if (count($gids) == $save_limit || $index + 1 == $new_count) {
        $queue->createItem(['gids' => $gids]);
        $gids = [];
      }
    }
    // Store the timestamp in state
    $last_time = \Drupal::time()->getRequestTime();
    \Drupal::state()->set('iguana.tea_import_last', $last_time);
  }
  elseif ($locked) {
    \Drupal::logger('iguana')->warning(t('Iguana Cron did not run because it is locked.'));
  }
}

If you are unfamiliar with Drupal's cron queue system, Drupal will run all of the implementations of hook_cron() and then processes as many queued jobs as it thinks it can before the PHP timeout. When a cron job is queued up, it's registered as an entry in the queue table of the database with a machine-name that's associated with a worker class and a bit of serialized data and, when it's picked up for processing, that class is instantiated with that bit of data unserialized. There is no guarantee which job is processed, so rather than running down a database query like in the batch operations, I chose to pass the worker which set of GIDs to process.

drupal/modules/custom/iguana/src/Plugin/QueueWorker/IguanaTeaImportWorker.php:
<?php
 
namespace Drupal\iguana\Plugin\QueueWorker;
 
use Drupal\Core\Queue\QueueWorkerBase;
use Drupal\Core\Database\Database;
use Drupal\iguana\IguanaTea;
 
/**
 * Updates Tea(s) from Iguana API data.
 *
 * @QueueWorker(
 *   id = "iguana_tea_import_worker",
 *   title = @Translation("Iguana Tea Import Worker"),
 *   cron = {"time" = 60}
 * )
 */
class IguanaTeaImportWorker extends QueueWorkerBase {
 
  /**
   * {@inheritdoc}
   */
  public function processItem($data) {
    $connection = Database::getConnection();
    $gids       = $data['gids'];
 
    if (empty($gids)) {
      \Drupal::logger('iguana')->warning(t('IguanaTeaImportWorker queue with no GPMS IDs!'));
      return;
    }
 
    $query = $connection->select('iguana_tea_staging', 'its');
    $query->fields('its');
    $query->condition('its.gid', $gids, 'IN');
    $results = $query->execute();
 
    foreach ($results as $row) {
      $gid      = (int) $row->gid;
      $tea_data = unserialize($row->data);
 
      try {
        $tea = new IguanaTea($tea_data);
        $tea->processTea(); // Custom data-to-node processing
 
        $connection->merge('iguana_tea_previous')
          ->key(['gid' => $gid])
          ->insertFields([
            'gid'  => $gid,
            'data' => $row->data,
          ])
          ->updateFields(['data' => $row->data])
          ->execute();
 
        $query = $connection->delete('iguana_tea_staging');
        $query->condition('gid', $gid);
        $query->execute();
      } catch (\Exception $e) {
        watchdog_exception('iguana', $e);
      }
    }
  }
 
}

Finally, Ultimate Cron was used so that Drupal's cron is triggered every minute to process the queue, but the various implementations of hook_cron() can be set to run at different intervals. So, iguana_cron() can be ran every quarter hour to queue up jobs that will take a minute or so to get through.

Filed under:

Really good blog. Was searching for blog related to Batch Processing and landed here. Great explaination! Thanks!

Add new comment

Restricted HTML

  • Allowed HTML tags: <a href hreflang> <em> <strong> <cite> <blockquote cite> <code> <ul type> <ol start type> <li> <dl> <dt> <dd> <h2 id> <h3 id> <h4 id> <h5 id> <h6 id>
  • You can enable syntax highlighting of source code with the following tags: <code>, <blockcode>, <cpp>, <java>, <php>. The supported tag styles are: <foo>, [foo].
  • Web page addresses and email addresses turn into links automatically.
  • Lines and paragraphs break automatically.

About the Author

Marcus Bernal, Software Engineer

Marcus started off out of high school programming Flash-based brochure CDs and websites for apartment communities. Since then, he has been developing LAMP-stack websites and, recently, Drupal modules. With the mind of a nerd, he has studied both computer science and civil engineering, so tackling large systems is his forte.

In his spare time, Marcus loves searching for new music, hiking/camping/backpacking, skateboarding, but, most importantly, being a father.