Skip to content
This repository has been archived by the owner on Sep 4, 2021. It is now read-only.

Commit

Permalink
[subscriber] Notify each about data once. Fixes MER#1410
Browse files Browse the repository at this point in the history
Each client is notified about changed data once until it reads the data from the
shared cache

Signed-off-by: Denis Zalevskiy <denis.zalevskiy@jolla.com>
  • Loading branch information
Denis Zalevskiy committed Nov 6, 2015
1 parent efa3ea4 commit ee4364e
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 17 deletions.
91 changes: 76 additions & 15 deletions src/contextkit-subscriber/property.cpp
Expand Up @@ -133,7 +133,8 @@ class Event : public QEvent
Write,
Refresh,
Data,
WriteStatus
WriteStatus,
Ready
};

virtual ~Event();
Expand Down Expand Up @@ -302,6 +303,15 @@ class ReplyEvent : public Event
target_handle tgt_;
};


class DataReadyEvent : public ReplyEvent
{
public:
DataReadyEvent(target_handle const &tgt)
: ReplyEvent(Event::Ready, tgt)
{}
};

class DataReplyEvent : public ReplyEvent
{
public:
Expand Down Expand Up @@ -474,10 +484,10 @@ void PropertyMonitor::write(WriteRequest *req)
}
}

void Property::changed(QVariant const &v) const
void Property::changed() const
{
for (auto tgt : targets_)
tgt->postEvent(new DataReplyEvent(v, tgt));
for (auto target : targets_)
target->dataReady(target);
}

bool Property::add(target_handle const &target)
Expand All @@ -486,6 +496,7 @@ bool Property::add(target_handle const &target)
auto it = targets_.find(target);
if (it == targets_.end()) {
targets_.insert(target);
target->attachCache(cache_);
res = true;
}
return res;
Expand Down Expand Up @@ -565,12 +576,25 @@ std::shared_ptr<Property> PropertyMonitor::add(const QString &key)
return it.value();
}

void Cache::store(QVariant v)
{
QMutexLocker lock(&mutex_);
data_ = v;
}

QVariant Cache::load() const
{
QMutexLocker lock(&mutex_);
return data_;
}

Property::Property(const QString &key, QObject *parent)
: QObject(parent)
, file_(key)
, reopen_interval_(100)
, reopen_timer_(new QTimer(this))
, is_subscribed_(false)
, cache_(std::make_shared<Cache>())
{
reopen_timer_->setSingleShot(true);
connect(reopen_timer_, SIGNAL(timeout()), this, SLOT(trySubscribe()));
Expand Down Expand Up @@ -622,7 +646,7 @@ bool Property::update()

if (!file_.tryOpen()) {
debug::warning("Can't open ", file_.fileName());
cache_ = statefs::qt::valueDefault(cache_);
cache_->store(statefs::qt::valueDefault(cache_->load()));
resubscribe();
return is_updated;
}
Expand Down Expand Up @@ -659,19 +683,24 @@ bool Property::update()
rc = bytes_read;
}
touchFile.close();
QVariant value, prev_value;
if (rc >= 0) {
buffer_[(int)rc] = '\0';
auto s = QString(buffer_);
prev_value = cache_->load();
if (s.size()) {
cache_ = statefs::qt::valueDecode(s);
value = statefs::qt::valueDecode(s);
} else {
if (cache_.isNull()) {
cache_ = s;
if (prev_value.isNull()) {
value = s;
} else {
cache_ = statefs::qt::valueDefault(cache_);
value = statefs::qt::valueDefault(prev_value);
}
}
is_updated = true;
if (value != prev_value) {
cache_->store(value);
is_updated = true;
}
} else {
debug::warning("Error accessing? ", rc, "..." + file_.fileName());
resubscribe();
Expand All @@ -683,12 +712,12 @@ bool Property::update()
void Property::handleActivated(int)
{
if (update())
changed(cache_);
changed();
}

QVariant Property::subscribe()
{
return (!is_subscribed_ ? subscribe_() : cache_);
return (!is_subscribed_ ? subscribe_() : cache_->load());
}

QVariant Property::subscribe_()
Expand All @@ -702,9 +731,9 @@ QVariant Property::subscribe_()
file_.connect(this, &Property::handleActivated);

if (update())
changed(cache_);
changed();

return cache_;
return cache_->load();
}

void Property::unsubscribe()
Expand All @@ -725,6 +754,7 @@ ContextPropertyPrivate::ContextPropertyPrivate(const QString &key)
, state_(Initial)
, is_cached_(false)
, handle_(this)
, update_queued_(ATOMIC_FLAG_INIT)
{
}

Expand Down Expand Up @@ -804,6 +834,7 @@ bool ContextPropertyPrivate::event(QEvent *e)
{
using statefs::qt::Event;
using statefs::qt::DataReplyEvent;
using statefs::qt::DataReadyEvent;
if (e->type() < QEvent::User)
return QObject::event(e);

Expand All @@ -816,8 +847,13 @@ bool ContextPropertyPrivate::event(QEvent *e)
if (p) onChanged(std::move(p->data_));
break;
}
case Event::Ready: {
auto p = EVENT_CAST(e, DataReadyEvent);
if (p) updateFromRemoteCache(p);
break;
}
default:
debug::warning("Unknown user event");
debug::warning("Unknown user event", t);
res = QObject::event(e);
}
};
Expand Down Expand Up @@ -945,6 +981,31 @@ void ContextPropertyPrivate::refresh() const
actor()->postEvent(new RefreshRequest(this->handle_, key_));
}

void ContextPropertyPrivate::attachCache(std::shared_ptr<statefs::qt::Cache> cache) const
{
// called from other thread
remote_cache_ = cache;
}

void ContextPropertyPrivate::dataReady(statefs::qt::target_handle self_handle)
{
// called from other thread
if (!update_queued_.test_and_set(std::memory_order_acquire)) {
postEvent(new statefs::qt::DataReadyEvent(self_handle));
}
}

void ContextPropertyPrivate::updateFromRemoteCache(statefs::qt::DataReadyEvent *)
{
// called from the object thread
update_queued_.clear(std::memory_order_release);
// cache is attached from an other thread, so save pointer copy
auto pcache = remote_cache_;
if (pcache)
onChanged(remote_cache_->load());
}


ContextProperty::ContextProperty(const QString &key, QObject *parent)
: QObject(parent)
, priv(new ContextPropertyPrivate(key))
Expand Down
26 changes: 24 additions & 2 deletions src/contextkit-subscriber/property.hpp
Expand Up @@ -129,6 +129,17 @@ class FileWriter : public File
bool write(QByteArray const &);
};

class Cache {
public:
Cache() {}

void store(QVariant v);
QVariant load() const;
private:
mutable QMutex mutex_;
QVariant data_;
};

class Property : public QObject
{
Q_OBJECT;
Expand All @@ -154,14 +165,14 @@ private slots:
bool tryOpen();
void resubscribe();
QVariant subscribe_();
void changed(QVariant const&) const;
void changed() const;

FileReader file_;
QByteArray buffer_;
mutable int reopen_interval_;
mutable QTimer *reopen_timer_;
bool is_subscribed_;
QVariant cache_;
std::shared_ptr<Cache> cache_;
QSet<target_handle> targets_;
};

Expand Down Expand Up @@ -192,6 +203,7 @@ class PropertyMonitor : public QObject
};

class ReplyEvent;
class DataReadyEvent;

}}

Expand Down Expand Up @@ -232,6 +244,7 @@ class ContextPropertyPrivate : public QObject

friend class ContextProperty;
friend class ContextPropertyPrivateHandle;

void detach();
void onChanged(QVariant) const;

Expand All @@ -253,6 +266,15 @@ class ContextPropertyPrivate : public QObject
mutable std::future<QVariant> on_subscribed_;
mutable std::future<void> on_unsubscribed_;
mutable QSharedPointer<ContextPropertyPrivate> handle_;

// TODO move functionality to the separate interface
friend class statefs::qt::Property;
void attachCache(std::shared_ptr<statefs::qt::Cache>) const;
void dataReady(statefs::qt::target_handle);
void updateFromRemoteCache(statefs::qt::DataReadyEvent *);

mutable std::shared_ptr<statefs::qt::Cache> remote_cache_;
mutable std::atomic_flag update_queued_;
};

#endif // _STATEFS_CKIT_PROPERTY_HPP_

0 comments on commit ee4364e

Please sign in to comment.