00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022 #include "connection.h"
00023 #include "connection_p.h"
00024
00025 #include <errno.h>
00026
00027 #include <QQueue>
00028 #include <QPointer>
00029 #include <QTime>
00030
00031 #include "kdebug.h"
00032 #include "kcomponentdata.h"
00033 #include "kglobal.h"
00034 #include "klocale.h"
00035 #include "kstandarddirs.h"
00036 #include "ktemporaryfile.h"
00037 #include "kurl.h"
00038
00039 using namespace KIO;
00040
00041 class KIO::ConnectionPrivate
00042 {
00043 public:
00044 inline ConnectionPrivate()
00045 : backend(0), suspended(false)
00046 { }
00047
00048 void dequeue();
00049 void commandReceived(const Task &task);
00050 void disconnected();
00051 void setBackend(AbstractConnectionBackend *b);
00052
00053 QQueue<Task> outgoingTasks;
00054 QQueue<Task> incomingTasks;
00055 AbstractConnectionBackend *backend;
00056 Connection *q;
00057 bool suspended;
00058 };
00059
00060 class KIO::ConnectionServerPrivate
00061 {
00062 public:
00063 inline ConnectionServerPrivate()
00064 : backend(0)
00065 { }
00066
00067 ConnectionServer *q;
00068 AbstractConnectionBackend *backend;
00069 };
00070
00071 void ConnectionPrivate::dequeue()
00072 {
00073 if (!backend || suspended)
00074 return;
00075
00076 while (!outgoingTasks.isEmpty()) {
00077 const Task task = outgoingTasks.dequeue();
00078 q->sendnow(task.cmd, task.data);
00079 }
00080
00081 if (!incomingTasks.isEmpty())
00082 emit q->readyRead();
00083 }
00084
00085 void ConnectionPrivate::commandReceived(const Task &task)
00086 {
00087
00088 if (!suspended && incomingTasks.isEmpty())
00089 QMetaObject::invokeMethod(q, "dequeue", Qt::QueuedConnection);
00090 incomingTasks.enqueue(task);
00091 }
00092
00093 void ConnectionPrivate::disconnected()
00094 {
00095 q->close();
00096 QMetaObject::invokeMethod(q, "readyRead", Qt::QueuedConnection);
00097 }
00098
00099 void ConnectionPrivate::setBackend(AbstractConnectionBackend *b)
00100 {
00101 backend = b;
00102 if (backend) {
00103 q->connect(backend, SIGNAL(commandReceived(Task)), SLOT(commandReceived(Task)));
00104 q->connect(backend, SIGNAL(disconnected()), SLOT(disconnected()));
00105 backend->setSuspended(suspended);
00106 }
00107 }
00108
00109 AbstractConnectionBackend::AbstractConnectionBackend(QObject *parent)
00110 : QObject(parent), state(Idle)
00111 {
00112 }
00113
00114 AbstractConnectionBackend::~AbstractConnectionBackend()
00115 {
00116 }
00117
00118 SocketConnectionBackend::SocketConnectionBackend(Mode m, QObject *parent)
00119 : AbstractConnectionBackend(parent), socket(0), len(-1), cmd(0),
00120 signalEmitted(false), mode(m)
00121 {
00122 localServer = 0;
00123
00124 }
00125
00126 SocketConnectionBackend::~SocketConnectionBackend()
00127 {
00128 if (mode == LocalSocketMode && localServer &&
00129 localServer->localSocketType() == KLocalSocket::UnixSocket)
00130 QFile::remove(localServer->localPath());
00131 }
00132
00133 void SocketConnectionBackend::setSuspended(bool enable)
00134 {
00135 if (state != Connected)
00136 return;
00137 Q_ASSERT(socket);
00138 Q_ASSERT(!localServer);
00139
00140 if (enable) {
00141
00142 socket->setReadBufferSize(1);
00143 } else {
00144
00145 socket->setReadBufferSize(0);
00146 if (socket->bytesAvailable() >= HeaderSize) {
00147
00148 QMetaObject::invokeMethod(this, "socketReadyRead", Qt::QueuedConnection);
00149 }
00150
00151
00152
00153
00154 QByteArray data = socket->read(socket->bytesAvailable() + 1);
00155 for (int i = data.size(); --i >= 0; )
00156 socket->ungetChar(data[i]);
00157 }
00158 }
00159
00160 bool SocketConnectionBackend::connectToRemote(const KUrl &url)
00161 {
00162 Q_ASSERT(state == Idle);
00163 Q_ASSERT(!socket);
00164 Q_ASSERT(!localServer);
00165
00166 if (mode == LocalSocketMode) {
00167 KLocalSocket *sock = new KLocalSocket(this);
00168 QString path = url.path();
00169 KLocalSocket::LocalSocketType type = KLocalSocket::UnixSocket;
00170
00171 if (url.queryItem(QLatin1String("abstract")) == QLatin1String("1"))
00172 type = KLocalSocket::AbstractUnixSocket;
00173
00174 sock->connectToPath(path);
00175 socket = sock;
00176 } else {
00177 socket = new QTcpSocket(this);
00178 socket->connectToHost(url.host(),url.port());
00179
00180 if (!socket->waitForConnected(1000)) {
00181 state = Idle;
00182 kDebug() << "could not connect to " << url;
00183 return false;
00184 }
00185 }
00186 connect(socket, SIGNAL(readyRead()), SLOT(socketReadyRead()));
00187 connect(socket, SIGNAL(disconnected()), SLOT(socketDisconnected()));
00188 state = Connected;
00189 return true;
00190 }
00191
00192 void SocketConnectionBackend::socketDisconnected()
00193 {
00194 state = Idle;
00195 emit disconnected();
00196 }
00197
00198 bool SocketConnectionBackend::listenForRemote()
00199 {
00200 Q_ASSERT(state == Idle);
00201 Q_ASSERT(!socket);
00202 Q_ASSERT(!localServer);
00203
00204 if (mode == LocalSocketMode) {
00205 QString prefix = KStandardDirs::locateLocal("socket", KGlobal::mainComponent().componentName());
00206 KTemporaryFile *socketfile = new KTemporaryFile();
00207 socketfile->setPrefix(prefix);
00208 socketfile->setSuffix(QLatin1String(".slave-socket"));
00209 if (!socketfile->open())
00210 {
00211 errorString = i18n("Unable to create io-slave: %1", strerror(errno));
00212 delete socketfile;
00213 return false;
00214 }
00215
00216 QString sockname = socketfile->fileName();
00217 KUrl addressUrl(sockname);
00218 addressUrl.setProtocol("local");
00219 address = addressUrl.url();
00220 delete socketfile;
00221
00222 localServer = new KLocalSocketServer(this);
00223 if (!localServer->listen(sockname, KLocalSocket::UnixSocket)) {
00224 errorString = localServer->errorString();
00225 delete localServer;
00226 return false;
00227 }
00228
00229 connect(localServer, SIGNAL(newConnection()), SIGNAL(newConnection()));
00230 } else {
00231 tcpServer = new QTcpServer(this);
00232 tcpServer->listen(QHostAddress::LocalHost);
00233 if (!tcpServer->isListening()) {
00234 errorString = tcpServer->errorString();
00235 delete tcpServer;
00236 return false;
00237 }
00238
00239 address = "tcp://127.0.0.1:" + QString::number(tcpServer->serverPort());
00240 connect(tcpServer, SIGNAL(newConnection()), SIGNAL(newConnection()));
00241 }
00242
00243 state = Listening;
00244 return true;
00245 }
00246
00247 bool SocketConnectionBackend::waitForIncomingTask(int ms)
00248 {
00249 Q_ASSERT(state == Connected);
00250 Q_ASSERT(socket);
00251 if (socket->state() != QAbstractSocket::ConnectedState) {
00252 state = Idle;
00253 return false;
00254 }
00255
00256 signalEmitted = false;
00257 if (socket->bytesAvailable())
00258 socketReadyRead();
00259 if (signalEmitted)
00260 return true;
00261
00262
00263 QTime timer;
00264 timer.start();
00265
00266 while (socket->state() == QAbstractSocket::ConnectedState && !signalEmitted &&
00267 (ms == -1 || timer.elapsed() < ms))
00268 if (!socket->waitForReadyRead(ms == -1 ? -1 : ms - timer.elapsed()))
00269 break;
00270
00271 if (signalEmitted)
00272 return true;
00273 if (socket->state() != QAbstractSocket::ConnectedState)
00274 state = Idle;
00275 return false;
00276 }
00277
00278 bool SocketConnectionBackend::sendCommand(const Task &task)
00279 {
00280 Q_ASSERT(state == Connected);
00281 Q_ASSERT(socket);
00282
00283 static char buffer[HeaderSize + 2];
00284 sprintf(buffer, "%6x_%2x_", task.data.size(), task.cmd);
00285 socket->write(buffer, HeaderSize);
00286 socket->write(task.data);
00287
00288
00289
00290
00291
00292
00293 while (socket->bytesToWrite() > 0 && socket->state() == QAbstractSocket::ConnectedState)
00294 socket->waitForBytesWritten(-1);
00295
00296 return socket->state() == QAbstractSocket::ConnectedState;
00297 }
00298
00299 AbstractConnectionBackend *SocketConnectionBackend::nextPendingConnection()
00300 {
00301 Q_ASSERT(state == Listening);
00302 Q_ASSERT(localServer || tcpServer);
00303 Q_ASSERT(!socket);
00304
00305
00306
00307 QTcpSocket *newSocket;
00308 if (mode == LocalSocketMode)
00309 newSocket = localServer->nextPendingConnection();
00310 else
00311 newSocket = tcpServer->nextPendingConnection();
00312 if (!newSocket)
00313 return 0;
00314
00315 SocketConnectionBackend *result = new SocketConnectionBackend(Mode(mode));
00316 result->state = Connected;
00317 result->socket = newSocket;
00318 newSocket->setParent(result);
00319 connect(newSocket, SIGNAL(readyRead()), result, SLOT(socketReadyRead()));
00320 connect(newSocket, SIGNAL(disconnected()), result, SLOT(socketDisconnected()));
00321
00322 return result;
00323 }
00324
00325 void SocketConnectionBackend::socketReadyRead()
00326 {
00327 bool shouldReadAnother;
00328 do {
00329 if (!socket)
00330
00331 return;
00332
00333
00334 if (len == -1) {
00335
00336 static char buffer[HeaderSize];
00337
00338 if (socket->bytesAvailable() < HeaderSize) {
00339 return;
00340 }
00341
00342 socket->read(buffer, sizeof buffer);
00343 buffer[6] = 0;
00344 buffer[9] = 0;
00345
00346 char *p = buffer;
00347 while( *p == ' ' ) p++;
00348 len = strtol( p, 0L, 16 );
00349
00350 p = buffer + 7;
00351 while( *p == ' ' ) p++;
00352 cmd = strtol( p, 0L, 16 );
00353
00354
00355
00356 }
00357
00358 QPointer<SocketConnectionBackend> that = this;
00359
00360
00361 if (socket->bytesAvailable() >= len) {
00362 Task task;
00363 task.cmd = cmd;
00364 if (len)
00365 task.data = socket->read(len);
00366 len = -1;
00367
00368 signalEmitted = true;
00369 emit commandReceived(task);
00370 }
00371
00372
00373 if (that.isNull())
00374 return;
00375
00376
00377 if (len == -1)
00378 shouldReadAnother = socket->bytesAvailable() >= HeaderSize;
00379 else
00380 shouldReadAnother = socket->bytesAvailable() >= len;
00381 }
00382 while (shouldReadAnother);
00383 }
00384
00385 Connection::Connection(QObject *parent)
00386 : QObject(parent), d(new ConnectionPrivate)
00387 {
00388 d->q = this;
00389 }
00390
00391 Connection::~Connection()
00392 {
00393 close();
00394 delete d;
00395 }
00396
00397 void Connection::suspend()
00398 {
00399
00400 d->suspended = true;
00401 if (d->backend)
00402 d->backend->setSuspended(true);
00403 }
00404
00405 void Connection::resume()
00406 {
00407
00408 QMetaObject::invokeMethod(this, "dequeue", Qt::QueuedConnection);
00409
00410
00411 d->suspended = false;
00412 if (d->backend)
00413 d->backend->setSuspended(false);
00414 }
00415
00416 void Connection::close()
00417 {
00418 if (d->backend) {
00419 d->backend->disconnect(this);
00420 d->backend->deleteLater();
00421 d->backend = 0;
00422 }
00423 d->outgoingTasks.clear();
00424 d->incomingTasks.clear();
00425 }
00426
00427 bool Connection::isConnected() const
00428 {
00429 return d->backend && d->backend->state == AbstractConnectionBackend::Connected;
00430 }
00431
00432 bool Connection::inited() const
00433 {
00434 return d->backend;
00435 }
00436
00437 bool Connection::suspended() const
00438 {
00439 return d->suspended;
00440 }
00441
00442 void Connection::connectToRemote(const QString &address)
00443 {
00444
00445 KUrl url = address;
00446 QString scheme = url.protocol();
00447
00448 if (scheme == QLatin1String("local")) {
00449 d->setBackend(new SocketConnectionBackend(SocketConnectionBackend::LocalSocketMode, this));
00450 } else if (scheme == QLatin1String("tcp")) {
00451 d->setBackend(new SocketConnectionBackend(SocketConnectionBackend::TcpSocketMode, this));
00452 } else {
00453 kWarning(7017) << "Unknown requested KIO::Connection protocol='" << scheme
00454 << "' (" << address << ")";
00455 Q_ASSERT(0);
00456 return;
00457 }
00458
00459
00460 if (!d->backend->connectToRemote(url)) {
00461
00462 delete d->backend;
00463 d->backend = 0;
00464 return;
00465 }
00466
00467 d->dequeue();
00468 }
00469
00470 QString Connection::errorString() const
00471 {
00472 if (d->backend)
00473 return d->backend->errorString;
00474 return QString();
00475 }
00476
00477 bool Connection::send(int cmd, const QByteArray& data)
00478 {
00479 if (!inited() || !d->outgoingTasks.isEmpty()) {
00480 Task task;
00481 task.cmd = cmd;
00482 task.data = data;
00483 d->outgoingTasks.enqueue(task);
00484 return true;
00485 } else {
00486 return sendnow(cmd, data);
00487 }
00488 }
00489
00490 bool Connection::sendnow(int _cmd, const QByteArray &data)
00491 {
00492 if (data.size() > 0xffffff)
00493 return false;
00494
00495 if (!isConnected())
00496 return false;
00497
00498
00499 Task task;
00500 task.cmd = _cmd;
00501 task.data = data;
00502 return d->backend->sendCommand(task);
00503 }
00504
00505 bool Connection::hasTaskAvailable() const
00506 {
00507 return !d->incomingTasks.isEmpty();
00508 }
00509
00510 bool Connection::waitForIncomingTask(int ms)
00511 {
00512 if (!isConnected())
00513 return false;
00514
00515 if (d->backend)
00516 return d->backend->waitForIncomingTask(ms);
00517 return false;
00518 }
00519
00520 int Connection::read( int* _cmd, QByteArray &data )
00521 {
00522
00523 if (d->incomingTasks.isEmpty()) {
00524
00525 return -1;
00526 }
00527 const Task task = d->incomingTasks.dequeue();
00528
00529
00530 *_cmd = task.cmd;
00531 data = task.data;
00532
00533
00534 if (!d->suspended && !d->incomingTasks.isEmpty())
00535 QMetaObject::invokeMethod(this, "dequeue", Qt::QueuedConnection);
00536
00537 return data.size();
00538 }
00539
00540 ConnectionServer::ConnectionServer(QObject *parent)
00541 : QObject(parent), d(new ConnectionServerPrivate)
00542 {
00543 d->q = this;
00544 }
00545
00546 ConnectionServer::~ConnectionServer()
00547 {
00548 delete d;
00549 }
00550
00551 void ConnectionServer::listenForRemote()
00552 {
00553 #ifdef Q_WS_WIN
00554 d->backend = new SocketConnectionBackend(SocketConnectionBackend::TcpSocketMode, this);
00555 #else
00556 d->backend = new SocketConnectionBackend(SocketConnectionBackend::LocalSocketMode, this);
00557 #endif
00558 if (!d->backend->listenForRemote()) {
00559 delete d->backend;
00560 d->backend = 0;
00561 return;
00562 }
00563
00564 connect(d->backend, SIGNAL(newConnection()), SIGNAL(newConnection()));
00565 kDebug(7017) << "Listening on " << d->backend->address;
00566 }
00567
00568 QString ConnectionServer::address() const
00569 {
00570 if (d->backend)
00571 return d->backend->address;
00572 return QString();
00573 }
00574
00575 bool ConnectionServer::isListening() const
00576 {
00577 return d->backend && d->backend->state == AbstractConnectionBackend::Listening;
00578 }
00579
00580 void ConnectionServer::close()
00581 {
00582 delete d->backend;
00583 d->backend = 0;
00584 }
00585
00586 Connection *ConnectionServer::nextPendingConnection()
00587 {
00588 if (!isListening())
00589 return 0;
00590
00591 AbstractConnectionBackend *newBackend = d->backend->nextPendingConnection();
00592 if (!newBackend)
00593 return 0;
00594
00595 Connection *result = new Connection;
00596 result->d->setBackend(newBackend);
00597 newBackend->setParent(result);
00598
00599 return result;
00600 }
00601
00602 void ConnectionServer::setNextPendingConnection(Connection *conn)
00603 {
00604 AbstractConnectionBackend *newBackend = d->backend->nextPendingConnection();
00605 Q_ASSERT(newBackend);
00606
00607 conn->d->backend = newBackend;
00608 conn->d->setBackend(newBackend);
00609 newBackend->setParent(conn);
00610
00611 conn->d->dequeue();
00612 }
00613
00614 #include "connection_p.moc"
00615 #include "connection.moc"