migration view and very simple first test added.

svn path=/plone.app.discussion/trunk/; revision=27894
This commit is contained in:
Timo Stollenwerk 2009-07-07 16:16:58 +00:00
parent 360a758905
commit 88cbd01379
3 changed files with 129 additions and 0 deletions

View File

@ -8,6 +8,15 @@
<!-- Traversal adapter -->
<adapter factory=".traversal.ConversationNamespace" name="conversation" />
<!-- Migration view -->
<browser:page
for="Products.CMFCore.interfaces.ISiteRoot"
name="comment-migration"
layer="..interfaces.IDiscussionLayer"
class=".migration.View"
permission="cmf.ManagePortal"
/>
<!-- Moderation view -->
<browser:page
for="Products.CMFCore.interfaces.ISiteRoot"

View File

@ -0,0 +1,62 @@
from Acquisition import aq_inner, aq_parent
from Products.Five.browser import BrowserView
from Products.Five.browser.pagetemplatefile import ViewPageTemplateFile
from Products.CMFCore.utils import getToolByName
from Products.CMFPlone import PloneMessageFactory as _
from Products.statusmessages.interfaces import IStatusMessage
from Products.CMFCore.interfaces import IContentish
from zope.component import createObject
from plone.app.discussion.interfaces import IConversation
class View(BrowserView):
"""Migration View
"""
def __call__(self):
context = aq_inner(self.context)
out = []
def log(msg):
context.plone_log(msg)
out.append(msg)
log("Comment migration started.")
# Find content
catalog = getToolByName(context, 'portal_catalog')
dtool = context.portal_discussion
brains = catalog.searchResults(
object_provides='Products.CMFCore.interfaces._content.IContentish')
log("Found %s content object to migrate." % len(brains))
for brain in brains:
if brain.portal_type != 'Discussion Item':
old_comments = []
obj = brain.getObject()
talkback = getattr( obj, 'talkback', None )
if talkback:
replies = talkback.objectValues()
log("%s: Found talkback with %s comments to migrate"\
% (obj.absolute_url(relative=1), len(replies)))
for reply in replies:
old_comments.append(reply)
# Build up new conversation/comments structure
conversation = IConversation(obj)
for old_comment in old_comments:
comment = createObject('plone.Comment')
comment.title = old_comment.Title()
comment.text = old_comment.text
conversation.addComment(comment)
log("Comment migration finished.")
return out

View File

@ -0,0 +1,58 @@
import unittest
from zope.annotation.interfaces import IAnnotations
from Products.CMFCore.utils import getToolByName
from Products.PloneTestCase.ptc import PloneTestCase
from plone.app.discussion.tests.layer import DiscussionLayer
from plone.app.discussion.browser.migration import View
from plone.app.discussion.interfaces import IConversation, IComment
class MigrationTest(PloneTestCase):
def afterSetUp(self):
self.loginAsPortalOwner()
typetool = self.portal.portal_types
typetool.constructContent('Document', self.portal, 'doc')
# Create a document
self.discussion = getToolByName(self.portal, 'portal_discussion', None)
self.discussion.overrideDiscussionFor(self.portal.doc, 1)
# Publish it
self.workflow = self.portal.portal_workflow
self.workflow.doActionFor(self.portal.doc, 'publish')
request = self.app.REQUEST
context = getattr(self.portal, 'doc')
self.view = View(context, request)
self.workflow.setChainForPortalTypes(('Discussion Item',), 'comment_review_workflow')
def test_migrate_comment(self):
# Create one comment
self.discussion.getDiscussionFor(self.portal.doc)
self.portal.doc.talkback.createReply('My Title', 'My Text')
reply = self.portal.doc.talkback.objectValues()[0]
self.assertEqual(reply.Title(), 'My Title')
self.assertEqual(reply.EditableBody(), 'My Text')
# Call migration script
self.view()
# Make sure a conversation has been created
self.failUnless('plone.app.discussion:conversation' in IAnnotations(self.portal.doc))
conversation = IConversation(self.portal.doc)
# Check migration
self.assertEquals(conversation.total_comments, 1)
self.failUnless(conversation.getComments().next())
self.assert_(IComment.providedBy(conversation.getComments().next()))
self.assertEquals(conversation.getComments().next().Title(), 'My Title')
self.assertEquals(conversation.getComments().next().text, 'My Text')
def test_suite():
return unittest.defaultTestLoader.loadTestsFromName(__name__)