11.3. Task Queue
Once we have Symbols in the cache, all that it takes to persist is to retrieve them, one by one, from the cache and persist with a call to JDO. This simple task seems to be achievable through RPC, but it will not work as intended on App Engine. Let’s understand the limitations imposed by the App Engine.
RPC has to be completed within 60 seconds, and when there are few
symbols, call may finish within the deadline. But as their number
increases, persistence being a resource hungry operation, App Engine
begins to throw DeadlineExceededException
. Next, we try to overcome
this issue by invoking separate RPC for each Symbol. While this
overcomes the deadline issue, it flogs the App Engine with multiple
simultaneous RPC, which occasionally results in persistence error.
For long running or resource hungry process, App Engine provides Task Queue service, which allows us to organize the work into small, discrete units called tasks, and add them to task queue for later execution. These tasks are initiated by the user request, but they run outside that request and not bound by the usual request deadline.
Before we get into task queues, we have to make some preparations to access the App Engine datastore.
Jdo Config
In a earlier chapter, we used jdoconfig.xml
to configure a database
connection to HSQLDB, and we need to modify the configuration to access
App Engine datastore as follows:
META-INF/jdoconfig.xml
<!-- GAE Datastore -->
<persistence-manager-factory name="datastore">
<property name="javax.jdo.PersistenceManagerFactoryClass"
value="org.datanucleus.api.jdo.JDOPersistenceManagerFactory" />
<property name="javax.jdo.option.ConnectionURL" value="appengine" />
<property name="javax.jdo.option.NontransactionalRead" value="true" />
<property name="javax.jdo.option.NontransactionalWrite"
value="true" />
<property name="javax.jdo.option.RetainValues" value="true" />
<property name="datanucleus.appengine.autoCreateDatastoreTxns"
value="true" />
<property name="datanucleus.cache.level2.type" value="soft" />
<property name="datanucleus.metadata.validate" value="false" />
</persistence-manager-factory>
In case, this configuration is not available in jdoconfig.xml
, then
App Engine throws, a rather confusing, error - NoClassDefFoundError
.
Same error also props up when JDO version is not set to v2 in App
Engine Settings window. So, you know what to look for when you encounter
this exception.
com.google.apphosting.utils.jetty.JettyLogger warn
WARNING: Error for /fins/gaeInsertTask
java.lang.NoClassDefFoundError: Could not initialize class in.fins.server.dao.jdo.PMF
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:186)
....
Identity
Earlier in domain classes, DataGroup
and Data
, we used long type for
Id, which is also the primary key. These classes are also child objects
of others because of one-to-many relationship, and in such cases when
primary key is of type long, App Engine JDO throws
JDOFatalUserException
.
javax.jdo.JDOFatalUserException: Error in meta-data for field
in.fins.shared.DataGroup.id : Cannot have a primary key of type long and
be a child object (owning field is in.fins.shared.Symbol.dataGroups").
To overcome this error, we have to use type String for primary key id in
in.fins.shared.DataGroup
and in.fins.shared.Data
classes. Then, in
mapping file package.jdo
, we have to map the field into App Engine
specific type gae.encoded-pk using extension feature of JDO.
in.fins.shared/package.jdo
<class name="DataGroup">
<field name="id" primary-key="true" value-strategy="identity">
<extension vendor-name="datanucleus" key="gae.encoded-pk"
value="true" />
</field>
<field name="category" />
....
<class name="Data">
<field name="id" primary-key="true" value-strategy="identity">
<extension vendor-name="datanucleus" key="gae.encoded-pk"
value="true" />
</field>
<field name="date" /
....
This solves the App Engine specific error, but in case you rollback to regular app server, then it complains that type String is not allowed for primary key when value-strategy is identity. So, when you want to move the app to regular app server, change the value-strategy attribute from identity to uuid-string (for HSQLDB) or auid (for MySQL), which should resolve the issue.
ID as String
It is a good practice to use id field of long type in domain classes and use strategies like Join Table to overcome App Engine related issues. But, we intentionally stick with the present domain model to highlight the issues with id of String type.
With these preparations, we are ready to use App Engine’s datastore to persist our domain classes.
DataStorePanel
DataStorePanel
is a custom widget with just one button - Persist
Symbols.
Figure 11.5. DataStorePanel
Unlike UploadPanel and CachePanel, DataStorePanel uses GWT RPC instead of FormPanel and form submission.
Only interesting thing in this widget is Click handler, which makes RPC
call to persistSymbols()
method of SymbolService.
in.fins.client.widget/DataStorePanel.java
....
Handler("persistButton")
void onInsertButtonClick(ClickEvent event) {
ISymbolServiceAsync dataStoreService = GWT.create(ISymbolService.class);
dataStoreService.persistSymbols("insert-queue",
new AsyncCallback<String>() {
@Override
public void onSuccess(String result) {
StatusEvent se = new StatusEvent(result);
EventBus.get().fireEvent(se);
}
@Override
public void onFailure(Throwable caught) {
StatusEvent se = new StatusEvent(caught.getMessage());
EventBus.get().fireEvent(se);
}
});
}
String insert-queue argument to method persistSymbols()
, tells server
to use that queue to persist the symbols.
SymbolService RPC
Service method, persistSymbols()
is a new method in existing
SymbolService
. Its definition is added to ISymbolService
and
ISymbolServiceAsync
interfaces in the client and its implementation to
SymbolService in the server.
in.fins.client.rpc/ISymbolService.java
@RemoteServiceRelativePath("symbolService")
public interface ISymbolService extends RemoteService {
....
public String persistSymbols(String qName);
}
in.fins.client.rpc/ISymbolServiceAsync.java
public interface ISymbolServiceAsync {
....
void persistSymbols(String qName, AsyncCallback<String> callback);
}
in.fins.server.service/SymbolService.java
@Override
public String persistSymbols(String qName) {
// persist symbols through ITask interface
}
Task Interface
In app servers, persistSymbols()
method can retrieve Symbols from
cache one by one and then using DAO, persist them to datastore. But, in
App Engine, same doesn’t work properly. To handle these differences,
interaction between SymbolService.persistSymbols()
method and
underlying datastore is done through an Interface, ITask
as show in
the next figure.
Figure 11.6. Task interface
ITask interface contains a single method execute(), which is implemented
by GaePersistTask
and PersistTask
classes.
SymbolService.persistSymbols()
method, based on the server type,
instantiates either of these two classes and assigns it to ITask field.
Then, it retrieves the list of symbol names from the cache and calls
ITask.execute()
method for each symbol name.
Let’s have a look at PersistTask, which is used by regular app servers.
in.fins.server.task/PersistTask.java
public class PersistTask implements ITask {
private DaoFactory daoFactory;
@Override
public void execute(Map<String, Object> parameters) throws Exception {
String ormName = (String) parameters.get("ormName");
ORM orm = DaoFactory.getOrmType(ormName);
daoFactory = DaoFactory.getDaoFactory(orm);
String symbolName = (String) parameters.get("symbolName");
ICache cache = Utils.createInstance(ICache.class,
"in.fins.server.cache.SimpleCache");
cache.createCache();
Symbol symbol = (Symbol) cache.get(symbolName);
persistSymbol(symbol);
}
public void persistSymbol(Symbol symbol) throws Exception {
try {
IDao<Symbol> symbolDao = daoFactory.getDao(Symbol.class);
symbolDao.insert(symbol);
} catch (Exception e) {
log.warning(e.getMessage());
throw e;
}
}
}
SymbolService.persistSymbol()
calls execute()
method passing symbol
name and Orm type through parameters and execute()
method, using these
parameters, obtains symbol from the cache and also appropriate
DaoFactory
. From DaoFactory, it gets SymbolDao and calls insert method
to persist the symbol.
But things get slightly complicated when Fins run on App Engine, which
uses GaePersistTask
.
in.fins.server.task/PersistTask.java
public class GaePersistTask extends HttpServlet implements ITask {
private DaoFactory daoFactory;
@Override
public void execute(Map<String, Object> parameters) throws Exception {
String qName = (String) parameters.get("queueName");
String symbolName = (String) parameters.get("symbolName");
// add a task to Task Queue to
// persist a Symbol
}
Here execute() method obtains symbol name and queue name from parameters and then adds a task to App Engine task queue. Later, App Engine dequeues the tasks one by one, which persist the symbol.
It is a two step process, first we have to define a Task and then, add them to Queue. Let’s see how it is done in detail.
App Engine Tasks
App Engine Task is simply a regular servlet, and GaePersistTask
extends HttpServlet
so that it acts as a App Engine Task. Please note
that we combined ITask
and HttpServlet
into a single class,
GaePersistTask
, just reduce a file, but it may be split into separate
classes one that implements ITask and another one to act task servlet.
When App Engine dequeues the task, it invokes the servlet and calls
either HttpServlet.doPost()
or HttpServlet.doGet()
method of the
task. Place persistence code in doPost()
method as shown below.
in.fins.server.task/GaePersistTask.java
@Override
protected void doPost(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
try {
ICache cache = Utils.createInstance(ICache.class,
"in.fins.server.cache.GaeCache");
String symbolName = req.getParameter("symbolName");
cache.createCache();
Symbol symbol = (Symbol) cache.get(symbolName);
persistSymbol(symbol);
} catch (Exception e) {
log.warning(e.getMessage());
}
}
From HttpServletRequest
, it obtains symbol name and retrieves the
corresponding Symbol from the cache and then, it persists the Symbol
using DaoFactory and SymbolDao as explained earlier in case of
PersistTask.
Next, we have to map the task servlet to an URL, like we do in case of any other regular servlet, adding a servlet mapping to the descriptor file web.xml.
war/WEB-INF/web.xml
<servlet>
<servlet-name>gaePersistTask</servlet-name>
<servlet-class>in.fins.server.task.GaePersistTask</servlet-class>
</servlet>
<servlet-mapping>
<servlet-name>gaePersistTask</servlet-name>
<url-pattern>/fins/gaePersistTask</url-pattern>
</servlet-mapping>
App Engine Queue
Task Queue API supports Push queues and Pull queues. Push queues process tasks, based on the processing rate configured in the queue definition, while Pull queues provides greater control over the task processing and also allows tasks to be consumed by code external to the application. In Fins, we use push queues as they are simple to use.
Queue definition file, queue.xml
, defines queues. For convenience, App
Engine provides a default push queue and inserts tasks, which are
without a queue name, into the default queue. In addition to default
queue, we may configure additional named queues through queue definition
file queue.xml.
war/WEB-INF/queue.xml
<queue-entries>
<queue>
<name>insert-queue</name>
<rate>1/s</rate>
<bucket-size>10</bucket-size>
</queue>
</queue-entries>
The above definition, configures push queue named insert-queue with bucket-size of 10 and rate of 1 task per second. Task queue uses Token bucket algorithm to control the rate of task execution. Rate directive tells that App Engine should refill the bucket with one token per second and bucket can hold a max 10 tokens.
App Engine executes push tasks by calling application-specific URL, which means we have to provide the URL of task when we add them to task queue. Following code adds GaePersistTask to insert-queue.
in.fins.server.task/PersistTask.java
TaskOptions task = withUrl("/fins/gaePersistTask").param("symbolName",
symbolName).method(Method.POST);
Queue queue = QueueFactory.getQueue(qName);
queue.add(task);
TaskOptions.Builder.withUrl()
method is used to create a task and set
various options. Arguments to this builder method indicate that, URL of
the task is /fins/gaePersistTask and symbol name is a request
parameter and method is POST. Then, task is added to queue. App Engine
dequeues the tasks one by one by calling the URL
/fins/gaePersistTask which invokes the GaePersistTask and calls it
doPost() method with symbol name as the request parameter.
Task instances
Now, question is how many instances of GaePersistTask gets created when
we persist say, three symbols and the answer is four instances. First,
we create an instance of GaePersist task in
SymbolService.persistSymbols()
method and call its execute method
which adds three tasks to task queue. For each task, App engine invokes
its URL, which creates an GaePersistTask servlet. In normal app server,
only one instance of servlet gets created, but it is important to note
that here tasks are executed on a cloud platform where each request may
get executed on some instance of app engine, on some server, in some
data center and somewhere in the world. App Engine instance in Data
Center X creates a GaePersistTask servlet to execute the first task and
another servlet instance in Data Center Y may handle the second task and
so on.
Following figure helps to clarify what plays out in the cloud, when we persist three Symbols.
Figure 11.7. Task Queue
DataStore Insert
To persist symbols, IDao
defines a new method insert()
which is
implemented by in.fins.server.dao.jdo.SymbolDao
and
in.fins.server.dao.hibernate.SymbolDao
. We have not taken efforts to
implement it for MyBatis, which is slightly complicated as compared to
the other two.
While persisting the symbol, insert()
method first checks whether a
symbol with the same name already existing in datastore and if so, it
updates the existing symbol else it insert the symbol.
Manage TaskQueue and DataStore
In Development Mode, App Engine provides Development Console -
http://localhost:8888/\_ah/admin
which is useful to manage DataStore and
Queue. Development console has options to view or delete datastore
entities. It also provides stats about the default and named task queues
and an option to purge pending tasks.
App Engine, in development mode, uses a binary file
war/WEB-INF/appengine-generated/local_db.bin
as its local datastore.
In case you want to zap the entire datastore, then hack is to delete
this file and restart the App Engine.
Next section explains the use of App Engine Backends to free the frontend exclusively for user requests.