Skip to content
This repository has been archived by the owner on Sep 4, 2021. It is now read-only.

Commit

Permalink
Browse files Browse the repository at this point in the history
[subscriber] thread-safe ContextProperty lifetime control
Signed-off-by: Denis Zalevskiy <denis.zalevskiy@jolla.com>
  • Loading branch information
Denis Zalevskiy committed Apr 9, 2015
1 parent fe84bb4 commit bbc65ad
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 20 deletions.
41 changes: 28 additions & 13 deletions src/contextkit-subscriber/property.cpp
Expand Up @@ -265,7 +265,7 @@ Event::~Event() {}
class SubscribeRequest : public Event
{
public:
SubscribeRequest(ContextPropertyPrivate const *tgt
SubscribeRequest(QSharedPointer<Adapter> tgt
, QString const&key
, std::promise<QVariant> &&res)
: Event(Event::Subscribe)
Expand All @@ -275,7 +275,7 @@ class SubscribeRequest : public Event
{}
virtual ~SubscribeRequest() {}

ContextPropertyPrivate const *tgt_;
QSharedPointer<Adapter> tgt_;
QString key_;
std::promise<QVariant> value_;

Expand All @@ -287,7 +287,7 @@ class SubscribeRequest : public Event
class UnsubscribeRequest : public Event
{
public:
UnsubscribeRequest(ContextPropertyPrivate const *tgt
UnsubscribeRequest(QSharedPointer<Adapter> tgt
, QString const &key
, std::promise<void> &&done)
: Event(Event::Unsubscribe)
Expand All @@ -299,7 +299,7 @@ class UnsubscribeRequest : public Event
done_.set_value();
}

ContextPropertyPrivate const *tgt_;
QSharedPointer<Adapter> tgt_;
QString key_;
std::promise<void> done_;
};
Expand Down Expand Up @@ -333,15 +333,15 @@ class WriteRequest : public QObject, public Event
class RefreshRequest : public Event
{
public:
RefreshRequest(ContextPropertyPrivate const *tgt
RefreshRequest(QSharedPointer<Adapter> tgt
, QString const &key)
: Event(Event::Refresh)
, tgt_(tgt)
, key_(key)
{}
virtual ~RefreshRequest() {}

ContextPropertyPrivate const *tgt_;
QSharedPointer<Adapter> tgt_;
QString key_;
};

Expand Down Expand Up @@ -427,7 +427,7 @@ void PropertyMonitor::subscribe(SubscribeRequest *req)
auto notify_fn = [req, &retval, tgt]() {
req->value_.set_value(retval);
QMetaObject::invokeMethod
(const_cast<ContextPropertyPrivate*>(tgt), "onChanged"
(const_cast<Adapter*>(tgt.data()), "onChanged"
, Qt::QueuedConnection, Q_ARG(QVariant, retval));
};
auto notify_on_exit = cor::on_scope_exit([notify_fn]() {
Expand All @@ -441,7 +441,7 @@ void PropertyMonitor::subscribe(SubscribeRequest *req)

auto it = targets_.find(key);
if (it == targets_.end()) {
it = targets_.insert(key, QSet<ContextPropertyPrivate const*>());
it = targets_.insert(key, QSet<QSharedPointer<Adapter> >());
it->insert(tgt);
handler = add(key);
} else {
Expand All @@ -453,7 +453,8 @@ void PropertyMonitor::subscribe(SubscribeRequest *req)
}

connect(handler, SIGNAL(changed(QVariant))
, tgt, SLOT(onChanged(QVariant)));
, tgt.data(), SLOT(onChanged(QVariant))
, Qt::QueuedConnection);

retval = handler->subscribe();
}
Expand Down Expand Up @@ -653,6 +654,19 @@ void Property::unsubscribe()
file_.close();
}

void Adapter::detach()
{
target_ = nullptr;
}

void Adapter::onChanged(QVariant v)
{
if (target_)
target_->onChanged(std::move(v));
else
debug::info("Do not notify deleted target");
}

}


Expand All @@ -661,13 +675,14 @@ ContextPropertyPrivate::ContextPropertyPrivate(const QString &key, QObject *pare
, key_(key)
, state_(Initial)
, is_cached_(false)
, adapter_(new ckit::Adapter(this))
{
}

ContextPropertyPrivate::~ContextPropertyPrivate()
{
unsubscribe();
waitForUnsubscription();
adapter_->detach();
}

bool ContextPropertyPrivate::waitForUnsubscription() const
Expand Down Expand Up @@ -760,7 +775,7 @@ void ContextPropertyPrivate::subscribe() const
state_ = Subscribing;
std::promise<QVariant> res;
on_subscribed_ = res.get_future();
auto ev = new ckit::SubscribeRequest(this, key_, std::move(res));
auto ev = new ckit::SubscribeRequest(this->adapter_, key_, std::move(res));
actor()->postEvent(ev);
};
execute_nothrow(fn, __PRETTY_FUNCTION__);
Expand All @@ -774,7 +789,7 @@ void ContextPropertyPrivate::unsubscribe() const

std::promise<void> res;
on_unsubscribed_ = res.get_future();
auto ev = new ckit::UnsubscribeRequest(this, key_, std::move(res));
auto ev = new ckit::UnsubscribeRequest(this->adapter_, key_, std::move(res));
actor()->postEvent(ev);
state_ = Unsubscribing;
};
Expand Down Expand Up @@ -824,7 +839,7 @@ void ContextPropertyPrivate::setTypeCheck(bool)

void ContextPropertyPrivate::refresh() const
{
actor()->postEvent(new ckit::RefreshRequest(this, key_));
actor()->postEvent(new ckit::RefreshRequest(this->adapter_, key_));
}

ContextProperty::ContextProperty(const QString &key, QObject *parent)
Expand Down
30 changes: 26 additions & 4 deletions src/contextkit-subscriber/property.hpp
Expand Up @@ -20,6 +20,7 @@
#include <QSet>
#include <QMap>
#include <QSocketNotifier>
#include <QPointer>

class ContextPropertyInfo;
class QSocketNotifier;
Expand Down Expand Up @@ -109,7 +110,7 @@ class FileReader : public File
qtaround::debug::warning("Can't connect, invalid handle", key());
}
}

virtual void close();
mutable QScopedPointer<QSocketNotifier> notifier_;
};
Expand All @@ -126,6 +127,8 @@ class FileWriter : public File
bool write(QByteArray const &);
};

class Adapter;

class Property : public QObject
{
Q_OBJECT;
Expand Down Expand Up @@ -178,13 +181,30 @@ class PropertyMonitor : public QObject
void write(WriteRequest *);
void refresh(RefreshRequest*);

QMap<QString, QSet<ContextPropertyPrivate const*> > targets_;
QMap<QString, QSet<QSharedPointer<Adapter> > > targets_;
QMap<QString, Property*> properties_;

static std::once_flag once_;
static monitor_ptr instance_;
};

class Adapter : public QObject
{
Q_OBJECT;
public:
Adapter(ContextPropertyPrivate *target)
: target_(target)
{}


public slots:
void onChanged(QVariant);
private:
friend class ::ContextPropertyPrivate;
void detach();
ContextPropertyPrivate *target_;
};

}

class ContextPropertyPrivate : public QObject
Expand Down Expand Up @@ -216,10 +236,11 @@ class ContextPropertyPrivate : public QObject
signals:
void valueChanged() const;

public slots:
void onChanged(QVariant) const;
private:

friend class ckit::Adapter;
void onChanged(QVariant) const;

enum State {
Initial,
Unsubscribing,
Expand All @@ -237,6 +258,7 @@ public slots:

mutable std::future<QVariant> on_subscribed_;
mutable std::future<void> on_unsubscribed_;
mutable QSharedPointer<ckit::Adapter> adapter_;
};

#endif // _STATEFS_CKIT_PROPERTY_HPP_
22 changes: 19 additions & 3 deletions tests/subscriber.cpp
Expand Up @@ -23,26 +23,42 @@ enum test_ids {
tid_race_condition = 1
};

static QString property1Name("Unknown.NonExistent");
static QString property2Name("Battery.ChargePercentage");
static QString &propertyName = property1Name;

template<> template<>
void object::test<tid_race_condition>()
{
auto test_fn = [](std::function<void()> idle) {
for (int i = 0; i < 1000; ++i) {
auto p = new ContextProperty("Battery.ChargePercentage");
delete new ContextProperty(propertyName);
}
for (int i = 0; i < 1000; ++i) {
delete new ContextProperty(propertyName);
idle();
p->subscribe();
}
for (int i = 0; i < 1000; ++i) {
auto p = new ContextProperty(propertyName);
idle();
delete new ContextProperty("Battery.ChargePercentage");
p->subscribe();
idle();
p->unsubscribe();
idle();
p->subscribe();
idle();
delete p;
idle();
delete new ContextProperty(propertyName);
idle();
}
};
test_fn([]() {});
auto test_events_fn = std::bind(test_fn, idle_event_loop);
execute_in_event_loop(test_events_fn);
propertyName = property2Name;
test_fn([]() {});
execute_in_event_loop(test_events_fn);
}

}

0 comments on commit bbc65ad

Please sign in to comment.